LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Coverage Total Hit
Test: PostgreSQL 20devel Lines: 92.9 % 762 708
Test Date: 2026-07-03 19:57:34 Functions: 97.0 % 33 32
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 78.9 % 649 512

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * publicationcmds.c
       4                 :             :  *      publication manipulation
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  * IDENTIFICATION
      10                 :             :  *      src/backend/commands/publicationcmds.c
      11                 :             :  *
      12                 :             :  *-------------------------------------------------------------------------
      13                 :             :  */
      14                 :             : 
      15                 :             : #include "postgres.h"
      16                 :             : 
      17                 :             : #include "access/htup_details.h"
      18                 :             : #include "access/table.h"
      19                 :             : #include "access/xact.h"
      20                 :             : #include "catalog/catalog.h"
      21                 :             : #include "catalog/indexing.h"
      22                 :             : #include "catalog/namespace.h"
      23                 :             : #include "catalog/objectaccess.h"
      24                 :             : #include "catalog/objectaddress.h"
      25                 :             : #include "catalog/pg_database.h"
      26                 :             : #include "catalog/pg_inherits.h"
      27                 :             : #include "catalog/pg_namespace.h"
      28                 :             : #include "catalog/pg_proc.h"
      29                 :             : #include "catalog/pg_publication.h"
      30                 :             : #include "catalog/pg_publication_namespace.h"
      31                 :             : #include "catalog/pg_publication_rel.h"
      32                 :             : #include "commands/defrem.h"
      33                 :             : #include "commands/event_trigger.h"
      34                 :             : #include "commands/publicationcmds.h"
      35                 :             : #include "miscadmin.h"
      36                 :             : #include "nodes/nodeFuncs.h"
      37                 :             : #include "parser/parse_clause.h"
      38                 :             : #include "parser/parse_collate.h"
      39                 :             : #include "parser/parse_relation.h"
      40                 :             : #include "rewrite/rewriteHandler.h"
      41                 :             : #include "storage/lmgr.h"
      42                 :             : #include "utils/acl.h"
      43                 :             : #include "utils/builtins.h"
      44                 :             : #include "utils/inval.h"
      45                 :             : #include "utils/lsyscache.h"
      46                 :             : #include "utils/rel.h"
      47                 :             : #include "utils/syscache.h"
      48                 :             : #include "utils/varlena.h"
      49                 :             : 
      50                 :             : 
      51                 :             : /*
      52                 :             :  * Information used to validate the columns in the row filter expression. See
      53                 :             :  * contain_invalid_rfcolumn_walker for details.
      54                 :             :  */
      55                 :             : typedef struct rf_context
      56                 :             : {
      57                 :             :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
      58                 :             :     bool        pubviaroot;     /* true if we are validating the parent
      59                 :             :                                  * relation's row filter */
      60                 :             :     Oid         relid;          /* relid of the relation */
      61                 :             :     Oid         parentid;       /* relid of the parent relation */
      62                 :             : } rf_context;
      63                 :             : 
      64                 :             : static List *OpenTableList(List *tables);
      65                 :             : static void CloseTableList(List *rels);
      66                 :             : static void LockSchemaList(List *schemalist);
      67                 :             : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      68                 :             :                                  AlterPublicationStmt *stmt);
      69                 :             : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      70                 :             : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
      71                 :             :                                   AlterPublicationStmt *stmt);
      72                 :             : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
      73                 :             : static char defGetGeneratedColsOption(DefElem *def);
      74                 :             : 
      75                 :             : 
      76                 :             : static void
      77                 :         711 : parse_publication_options(ParseState *pstate,
      78                 :             :                           List *options,
      79                 :             :                           bool *publish_given,
      80                 :             :                           PublicationActions *pubactions,
      81                 :             :                           bool *publish_via_partition_root_given,
      82                 :             :                           bool *publish_via_partition_root,
      83                 :             :                           bool *publish_generated_columns_given,
      84                 :             :                           char *publish_generated_columns)
      85                 :             : {
      86                 :             :     ListCell   *lc;
      87                 :             : 
      88                 :         711 :     *publish_given = false;
      89                 :         711 :     *publish_via_partition_root_given = false;
      90                 :         711 :     *publish_generated_columns_given = false;
      91                 :             : 
      92                 :             :     /* defaults */
      93                 :         711 :     pubactions->pubinsert = true;
      94                 :         711 :     pubactions->pubupdate = true;
      95                 :         711 :     pubactions->pubdelete = true;
      96                 :         711 :     pubactions->pubtruncate = true;
      97                 :         711 :     *publish_via_partition_root = false;
      98                 :         711 :     *publish_generated_columns = PUBLISH_GENCOLS_NONE;
      99                 :             : 
     100                 :             :     /* Parse options */
     101   [ +  +  +  +  :         975 :     foreach(lc, options)
                   +  + ]
     102                 :             :     {
     103                 :         288 :         DefElem    *defel = (DefElem *) lfirst(lc);
     104                 :             : 
     105         [ +  + ]:         288 :         if (strcmp(defel->defname, "publish") == 0)
     106                 :             :         {
     107                 :             :             char       *publish;
     108                 :             :             List       *publish_list;
     109                 :             :             ListCell   *lc2;
     110                 :             : 
     111         [ -  + ]:          89 :             if (*publish_given)
     112                 :           0 :                 errorConflictingDefElem(defel, pstate);
     113                 :             : 
     114                 :             :             /*
     115                 :             :              * If publish option was given only the explicitly listed actions
     116                 :             :              * should be published.
     117                 :             :              */
     118                 :          89 :             pubactions->pubinsert = false;
     119                 :          89 :             pubactions->pubupdate = false;
     120                 :          89 :             pubactions->pubdelete = false;
     121                 :          89 :             pubactions->pubtruncate = false;
     122                 :             : 
     123                 :          89 :             *publish_given = true;
     124                 :             : 
     125                 :             :             /*
     126                 :             :              * SplitIdentifierString destructively modifies its input, so make
     127                 :             :              * a copy so we don't modify the memory of the executing statement
     128                 :             :              */
     129                 :          89 :             publish = pstrdup(defGetString(defel));
     130                 :             : 
     131         [ -  + ]:          89 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     132         [ #  # ]:           0 :                 ereport(ERROR,
     133                 :             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     134                 :             :                          errmsg("invalid list syntax in parameter \"%s\"",
     135                 :             :                                 "publish")));
     136                 :             : 
     137                 :             :             /* Process the option list. */
     138   [ +  +  +  +  :         208 :             foreach(lc2, publish_list)
                   +  + ]
     139                 :             :             {
     140                 :         123 :                 char       *publish_opt = (char *) lfirst(lc2);
     141                 :             : 
     142         [ +  + ]:         123 :                 if (strcmp(publish_opt, "insert") == 0)
     143                 :          79 :                     pubactions->pubinsert = true;
     144         [ +  + ]:          44 :                 else if (strcmp(publish_opt, "update") == 0)
     145                 :          18 :                     pubactions->pubupdate = true;
     146         [ +  + ]:          26 :                 else if (strcmp(publish_opt, "delete") == 0)
     147                 :          11 :                     pubactions->pubdelete = true;
     148         [ +  + ]:          15 :                 else if (strcmp(publish_opt, "truncate") == 0)
     149                 :          11 :                     pubactions->pubtruncate = true;
     150                 :             :                 else
     151         [ +  - ]:           4 :                     ereport(ERROR,
     152                 :             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     153                 :             :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
     154                 :             :                                     "publish", publish_opt)));
     155                 :             :             }
     156                 :             :         }
     157         [ +  + ]:         199 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     158                 :             :         {
     159         [ +  + ]:         142 :             if (*publish_via_partition_root_given)
     160                 :           4 :                 errorConflictingDefElem(defel, pstate);
     161                 :         138 :             *publish_via_partition_root_given = true;
     162                 :         138 :             *publish_via_partition_root = defGetBoolean(defel);
     163                 :             :         }
     164         [ +  + ]:          57 :         else if (strcmp(defel->defname, "publish_generated_columns") == 0)
     165                 :             :         {
     166         [ +  + ]:          53 :             if (*publish_generated_columns_given)
     167                 :           4 :                 errorConflictingDefElem(defel, pstate);
     168                 :          49 :             *publish_generated_columns_given = true;
     169                 :          49 :             *publish_generated_columns = defGetGeneratedColsOption(defel);
     170                 :             :         }
     171                 :             :         else
     172         [ +  - ]:           4 :             ereport(ERROR,
     173                 :             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     174                 :             :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     175                 :             :     }
     176                 :         687 : }
     177                 :             : 
     178                 :             : /*
     179                 :             :  * Convert the PublicationObjSpecType list into schema oid list and
     180                 :             :  * PublicationTable list.
     181                 :             :  */
     182                 :             : static void
     183                 :        1300 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
     184                 :             :                            List **rels, List **exceptrels, List **schemas)
     185                 :             : {
     186                 :             :     ListCell   *cell;
     187                 :             :     PublicationObjSpec *pubobj;
     188                 :             : 
     189         [ +  + ]:        1300 :     if (!pubobjspec_list)
     190                 :         191 :         return;
     191                 :             : 
     192   [ +  -  +  +  :        2370 :     foreach(cell, pubobjspec_list)
                   +  + ]
     193                 :             :     {
     194                 :             :         Oid         schemaid;
     195                 :             :         List       *search_path;
     196                 :             : 
     197                 :        1285 :         pubobj = (PublicationObjSpec *) lfirst(cell);
     198                 :             : 
     199   [ +  +  +  +  :        1285 :         switch (pubobj->pubobjtype)
                      - ]
     200                 :             :         {
     201                 :          91 :             case PUBLICATIONOBJ_EXCEPT_TABLE:
     202                 :          91 :                 pubobj->pubtable->except = true;
     203                 :          91 :                 *exceptrels = lappend(*exceptrels, pubobj->pubtable);
     204                 :          91 :                 break;
     205                 :         930 :             case PUBLICATIONOBJ_TABLE:
     206                 :         930 :                 pubobj->pubtable->except = false;
     207                 :         930 :                 *rels = lappend(*rels, pubobj->pubtable);
     208                 :         930 :                 break;
     209                 :         248 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
     210                 :         248 :                 schemaid = get_namespace_oid(pubobj->name, false);
     211                 :             : 
     212                 :             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     213                 :         228 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     214                 :         228 :                 break;
     215                 :          16 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
     216                 :          16 :                 search_path = fetch_search_path(false);
     217         [ +  + ]:          16 :                 if (search_path == NIL) /* nothing valid in search_path? */
     218         [ +  - ]:           4 :                     ereport(ERROR,
     219                 :             :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
     220                 :             :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
     221                 :             : 
     222                 :          12 :                 schemaid = linitial_oid(search_path);
     223                 :          12 :                 list_free(search_path);
     224                 :             : 
     225                 :             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     226                 :          12 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     227                 :          12 :                 break;
     228                 :           0 :             default:
     229                 :             :                 /* shouldn't happen */
     230         [ #  # ]:           0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
     231                 :             :                 break;
     232                 :             :         }
     233                 :             :     }
     234                 :             : }
     235                 :             : 
     236                 :             : /*
     237                 :             :  * Returns true if any of the columns used in the row filter WHERE expression is
     238                 :             :  * not part of REPLICA IDENTITY, false otherwise.
     239                 :             :  */
     240                 :             : static bool
     241                 :         172 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
     242                 :             : {
     243         [ -  + ]:         172 :     if (node == NULL)
     244                 :           0 :         return false;
     245                 :             : 
     246         [ +  + ]:         172 :     if (IsA(node, Var))
     247                 :             :     {
     248                 :          69 :         Var        *var = (Var *) node;
     249                 :          69 :         AttrNumber  attnum = var->varattno;
     250                 :             : 
     251                 :             :         /*
     252                 :             :          * If pubviaroot is true, we are validating the row filter of the
     253                 :             :          * parent table, but the bitmap contains the replica identity
     254                 :             :          * information of the child table. So, get the column number of the
     255                 :             :          * child table as parent and child column order could be different.
     256                 :             :          */
     257         [ +  + ]:          69 :         if (context->pubviaroot)
     258                 :             :         {
     259                 :          10 :             char       *colname = get_attname(context->parentid, attnum, false);
     260                 :             : 
     261                 :          10 :             attnum = get_attnum(context->relid, colname);
     262                 :             :         }
     263                 :             : 
     264         [ +  + ]:          69 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
     265                 :          69 :                            context->bms_replident))
     266                 :          40 :             return true;
     267                 :             :     }
     268                 :             : 
     269                 :         132 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
     270                 :             :                                   context);
     271                 :             : }
     272                 :             : 
     273                 :             : /*
     274                 :             :  * Check if all columns referenced in the filter expression are part of the
     275                 :             :  * REPLICA IDENTITY index or not.
     276                 :             :  *
     277                 :             :  * Returns true if any invalid column is found.
     278                 :             :  */
     279                 :             : bool
     280                 :         434 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     281                 :             :                                bool pubviaroot)
     282                 :             : {
     283                 :             :     HeapTuple   rftuple;
     284                 :         434 :     Oid         relid = RelationGetRelid(relation);
     285                 :         434 :     Oid         publish_as_relid = RelationGetRelid(relation);
     286                 :         434 :     bool        result = false;
     287                 :             :     Datum       rfdatum;
     288                 :             :     bool        rfisnull;
     289                 :             : 
     290                 :             :     /*
     291                 :             :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
     292                 :             :      * allowed in the row filter and we can skip the validation.
     293                 :             :      */
     294         [ +  + ]:         434 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     295                 :         113 :         return false;
     296                 :             : 
     297                 :             :     /*
     298                 :             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     299                 :             :      * is published via this publication as we need to use its row filter
     300                 :             :      * expression to filter the partition's changes.
     301                 :             :      *
     302                 :             :      * Note that even though the row filter used is for an ancestor, the
     303                 :             :      * REPLICA IDENTITY used will be for the actual child table.
     304                 :             :      */
     305   [ +  +  +  + ]:         321 :     if (pubviaroot && relation->rd_rel->relispartition)
     306                 :             :     {
     307                 :             :         publish_as_relid
     308                 :          68 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     309                 :             : 
     310         [ +  + ]:          68 :         if (!OidIsValid(publish_as_relid))
     311                 :           3 :             publish_as_relid = relid;
     312                 :             :     }
     313                 :             : 
     314                 :         321 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     315                 :             :                               ObjectIdGetDatum(publish_as_relid),
     316                 :             :                               ObjectIdGetDatum(pubid));
     317                 :             : 
     318         [ +  + ]:         321 :     if (!HeapTupleIsValid(rftuple))
     319                 :          34 :         return false;
     320                 :             : 
     321                 :         287 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     322                 :             :                               Anum_pg_publication_rel_prqual,
     323                 :             :                               &rfisnull);
     324                 :             : 
     325         [ +  + ]:         287 :     if (!rfisnull)
     326                 :             :     {
     327                 :          68 :         rf_context  context = {0};
     328                 :             :         Node       *rfnode;
     329                 :          68 :         Bitmapset  *bms = NULL;
     330                 :             : 
     331                 :          68 :         context.pubviaroot = pubviaroot;
     332                 :          68 :         context.parentid = publish_as_relid;
     333                 :          68 :         context.relid = relid;
     334                 :             : 
     335                 :             :         /* Remember columns that are part of the REPLICA IDENTITY */
     336                 :          68 :         bms = RelationGetIndexAttrBitmap(relation,
     337                 :             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     338                 :             : 
     339                 :          68 :         context.bms_replident = bms;
     340                 :          68 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
     341                 :          68 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
     342                 :             :     }
     343                 :             : 
     344                 :         287 :     ReleaseSysCache(rftuple);
     345                 :             : 
     346                 :         287 :     return result;
     347                 :             : }
     348                 :             : 
     349                 :             : /*
     350                 :             :  * Check for invalid columns in the publication table definition.
     351                 :             :  *
     352                 :             :  * This function evaluates two conditions:
     353                 :             :  *
     354                 :             :  * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
     355                 :             :  *    by the column list. If any column is missing, *invalid_column_list is set
     356                 :             :  *    to true.
     357                 :             :  * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
     358                 :             :  *    are published, either by being explicitly named in the column list or, if
     359                 :             :  *    no column list is specified, by setting the option
     360                 :             :  *    publish_generated_columns to stored. If any unpublished
     361                 :             :  *    generated column is found, *invalid_gen_col is set to true.
     362                 :             :  *
     363                 :             :  * Returns true if any of the above conditions are not met.
     364                 :             :  */
     365                 :             : bool
     366                 :         564 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     367                 :             :                             bool pubviaroot, char pubgencols_type,
     368                 :             :                             bool *invalid_column_list,
     369                 :             :                             bool *invalid_gen_col)
     370                 :             : {
     371                 :         564 :     Oid         relid = RelationGetRelid(relation);
     372                 :         564 :     Oid         publish_as_relid = RelationGetRelid(relation);
     373                 :             :     Bitmapset  *idattrs;
     374                 :         564 :     Bitmapset  *columns = NULL;
     375                 :         564 :     TupleDesc   desc = RelationGetDescr(relation);
     376                 :             :     Publication *pub;
     377                 :             :     int         x;
     378                 :             : 
     379                 :         564 :     *invalid_column_list = false;
     380                 :         564 :     *invalid_gen_col = false;
     381                 :             : 
     382                 :             :     /*
     383                 :             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     384                 :             :      * is published via this publication as we need to use its column list for
     385                 :             :      * the changes.
     386                 :             :      *
     387                 :             :      * Note that even though the column list used is for an ancestor, the
     388                 :             :      * REPLICA IDENTITY used will be for the actual child table.
     389                 :             :      */
     390   [ +  +  +  + ]:         564 :     if (pubviaroot && relation->rd_rel->relispartition)
     391                 :             :     {
     392                 :          97 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     393                 :             : 
     394         [ +  + ]:          97 :         if (!OidIsValid(publish_as_relid))
     395                 :          18 :             publish_as_relid = relid;
     396                 :             :     }
     397                 :             : 
     398                 :             :     /* Fetch the column list */
     399                 :         564 :     pub = GetPublication(pubid);
     400                 :         564 :     check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
     401                 :             : 
     402         [ +  + ]:         564 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     403                 :             :     {
     404                 :             :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
     405                 :         126 :         *invalid_column_list = (columns != NULL);
     406                 :             : 
     407                 :             :         /*
     408                 :             :          * As we don't allow a column list with REPLICA IDENTITY FULL, the
     409                 :             :          * publish_generated_columns option must be set to stored if the table
     410                 :             :          * has any stored generated columns.
     411                 :             :          */
     412         [ +  + ]:         126 :         if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
     413         [ +  + ]:         118 :             relation->rd_att->constr &&
     414         [ +  + ]:          54 :             relation->rd_att->constr->has_generated_stored)
     415                 :           4 :             *invalid_gen_col = true;
     416                 :             : 
     417                 :             :         /*
     418                 :             :          * Virtual generated columns are currently not supported for logical
     419                 :             :          * replication at all.
     420                 :             :          */
     421         [ +  + ]:         126 :         if (relation->rd_att->constr &&
     422         [ +  + ]:          62 :             relation->rd_att->constr->has_generated_virtual)
     423                 :           8 :             *invalid_gen_col = true;
     424                 :             : 
     425   [ +  +  -  + ]:         126 :         if (*invalid_gen_col && *invalid_column_list)
     426                 :           0 :             return true;
     427                 :             :     }
     428                 :             : 
     429                 :             :     /* Remember columns that are part of the REPLICA IDENTITY */
     430                 :         564 :     idattrs = RelationGetIndexAttrBitmap(relation,
     431                 :             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     432                 :             : 
     433                 :             :     /*
     434                 :             :      * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
     435                 :             :      * (to handle system columns the usual way), while column list does not
     436                 :             :      * use offset, so we can't do bms_is_subset(). Instead, we have to loop
     437                 :             :      * over the idattrs and check all of them are in the list.
     438                 :             :      */
     439                 :         564 :     x = -1;
     440         [ +  + ]:         954 :     while ((x = bms_next_member(idattrs, x)) >= 0)
     441                 :             :     {
     442                 :         394 :         AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
     443                 :         394 :         Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
     444                 :             : 
     445         [ +  + ]:         394 :         if (columns == NULL)
     446                 :             :         {
     447                 :             :             /*
     448                 :             :              * The publish_generated_columns option must be set to stored if
     449                 :             :              * the REPLICA IDENTITY contains any stored generated column.
     450                 :             :              */
     451   [ +  +  +  - ]:         259 :             if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
     452                 :             :             {
     453                 :           4 :                 *invalid_gen_col = true;
     454                 :           4 :                 break;
     455                 :             :             }
     456                 :             : 
     457                 :             :             /*
     458                 :             :              * The equivalent setting for virtual generated columns does not
     459                 :             :              * exist yet.
     460                 :             :              */
     461         [ -  + ]:         255 :             if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     462                 :             :             {
     463                 :           0 :                 *invalid_gen_col = true;
     464                 :           0 :                 break;
     465                 :             :             }
     466                 :             : 
     467                 :             :             /* Skip validating the column list since it is not defined */
     468                 :         255 :             continue;
     469                 :             :         }
     470                 :             : 
     471                 :             :         /*
     472                 :             :          * If pubviaroot is true, we are validating the column list of the
     473                 :             :          * parent table, but the bitmap contains the replica identity
     474                 :             :          * information of the child table. The parent/child attnums may not
     475                 :             :          * match, so translate them to the parent - get the attname from the
     476                 :             :          * child, and look it up in the parent.
     477                 :             :          */
     478         [ +  + ]:         135 :         if (pubviaroot)
     479                 :             :         {
     480                 :             :             /* attribute name in the child table */
     481                 :          51 :             char       *colname = get_attname(relid, attnum, false);
     482                 :             : 
     483                 :             :             /*
     484                 :             :              * Determine the attnum for the attribute name in parent (we are
     485                 :             :              * using the column list defined on the parent).
     486                 :             :              */
     487                 :          51 :             attnum = get_attnum(publish_as_relid, colname);
     488                 :             :         }
     489                 :             : 
     490                 :             :         /* replica identity column, not covered by the column list */
     491                 :         135 :         *invalid_column_list |= !bms_is_member(attnum, columns);
     492                 :             : 
     493   [ +  +  -  + ]:         135 :         if (*invalid_column_list && *invalid_gen_col)
     494                 :           0 :             break;
     495                 :             :     }
     496                 :             : 
     497                 :         564 :     bms_free(columns);
     498                 :         564 :     bms_free(idattrs);
     499                 :             : 
     500   [ +  +  +  + ]:         564 :     return *invalid_column_list || *invalid_gen_col;
     501                 :             : }
     502                 :             : 
     503                 :             : /*
     504                 :             :  * Invalidate entries in the RelationSyncCache for relations included in the
     505                 :             :  * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
     506                 :             :  *
     507                 :             :  * If 'puballtables' is true, invalidate all cache entries.
     508                 :             :  */
     509                 :             : void
     510                 :          20 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
     511                 :             : {
     512         [ +  + ]:          20 :     if (puballtables)
     513                 :             :     {
     514                 :           3 :         CacheInvalidateRelSyncAll();
     515                 :             :     }
     516                 :             :     else
     517                 :             :     {
     518                 :          17 :         List       *relids = NIL;
     519                 :          17 :         List       *schemarelids = NIL;
     520                 :             : 
     521                 :             :         /*
     522                 :             :          * For partitioned tables, we must invalidate all partitions and
     523                 :             :          * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
     524                 :             :          * a target. However, WAL records for TRUNCATE specify both a root and
     525                 :             :          * its leaves.
     526                 :             :          */
     527                 :          17 :         relids = GetIncludedPublicationRelations(pubid,
     528                 :             :                                                  PUBLICATION_PART_ALL);
     529                 :          17 :         schemarelids = GetAllSchemaPublicationRelations(pubid,
     530                 :             :                                                         PUBLICATION_PART_ALL);
     531                 :             : 
     532                 :          17 :         relids = list_concat_unique_oid(relids, schemarelids);
     533                 :             : 
     534                 :             :         /* Invalidate the relsyncache */
     535   [ +  +  +  +  :          37 :         foreach_oid(relid, relids)
                   +  + ]
     536                 :           3 :             CacheInvalidateRelSync(relid);
     537                 :             :     }
     538                 :             : 
     539                 :          20 :     return;
     540                 :             : }
     541                 :             : 
     542                 :             : /* check_functions_in_node callback */
     543                 :             : static bool
     544                 :         288 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
     545                 :             : {
     546   [ +  +  +  + ]:         288 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
     547                 :             :             func_id >= FirstNormalObjectId);
     548                 :             : }
     549                 :             : 
     550                 :             : /*
     551                 :             :  * The row filter walker checks if the row filter expression is a "simple
     552                 :             :  * expression".
     553                 :             :  *
     554                 :             :  * It allows only simple or compound expressions such as:
     555                 :             :  * - (Var Op Const)
     556                 :             :  * - (Var Op Var)
     557                 :             :  * - (Var Op Const) AND/OR (Var Op Const)
     558                 :             :  * - etc
     559                 :             :  * (where Var is a column of the table this filter belongs to)
     560                 :             :  *
     561                 :             :  * The simple expression has the following restrictions:
     562                 :             :  * - User-defined operators are not allowed;
     563                 :             :  * - User-defined functions are not allowed;
     564                 :             :  * - User-defined types are not allowed;
     565                 :             :  * - User-defined collations are not allowed;
     566                 :             :  * - Non-immutable built-in functions are not allowed;
     567                 :             :  * - System columns are not allowed.
     568                 :             :  *
     569                 :             :  * NOTES
     570                 :             :  *
     571                 :             :  * We don't allow user-defined functions/operators/types/collations because
     572                 :             :  * (a) if a user drops a user-defined object used in a row filter expression or
     573                 :             :  * if there is any other error while using it, the logical decoding
     574                 :             :  * infrastructure won't be able to recover from such an error even if the
     575                 :             :  * object is recreated again because a historic snapshot is used to evaluate
     576                 :             :  * the row filter;
     577                 :             :  * (b) a user-defined function can be used to access tables that could have
     578                 :             :  * unpleasant results because a historic snapshot is used. That's why only
     579                 :             :  * immutable built-in functions are allowed in row filter expressions.
     580                 :             :  *
     581                 :             :  * We don't allow system columns because currently, we don't have that
     582                 :             :  * information in the tuple passed to downstream. Also, as we don't replicate
     583                 :             :  * those to subscribers, there doesn't seem to be a need for a filter on those
     584                 :             :  * columns.
     585                 :             :  *
     586                 :             :  * We can allow other node types after more analysis and testing.
     587                 :             :  */
     588                 :             : static bool
     589                 :         952 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
     590                 :             : {
     591                 :         952 :     char       *errdetail_msg = NULL;
     592                 :             : 
     593         [ +  + ]:         952 :     if (node == NULL)
     594                 :           4 :         return false;
     595                 :             : 
     596   [ +  +  +  +  :         948 :     switch (nodeTag(node))
                   +  + ]
     597                 :             :     {
     598                 :         257 :         case T_Var:
     599                 :             :             /* System columns are not allowed. */
     600         [ +  + ]:         257 :             if (((Var *) node)->varattno < InvalidAttrNumber)
     601                 :           4 :                 errdetail_msg = _("System columns are not allowed.");
     602                 :         257 :             break;
     603                 :         259 :         case T_OpExpr:
     604                 :             :         case T_DistinctExpr:
     605                 :             :         case T_NullIfExpr:
     606                 :             :             /* OK, except user-defined operators are not allowed. */
     607         [ +  + ]:         259 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
     608                 :           4 :                 errdetail_msg = _("User-defined operators are not allowed.");
     609                 :         259 :             break;
     610                 :           4 :         case T_ScalarArrayOpExpr:
     611                 :             :             /* OK, except user-defined operators are not allowed. */
     612         [ -  + ]:           4 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
     613                 :           0 :                 errdetail_msg = _("User-defined operators are not allowed.");
     614                 :             : 
     615                 :             :             /*
     616                 :             :              * We don't need to check the hashfuncid and negfuncid of
     617                 :             :              * ScalarArrayOpExpr as those functions are only built for a
     618                 :             :              * subquery.
     619                 :             :              */
     620                 :           4 :             break;
     621                 :           4 :         case T_RowCompareExpr:
     622                 :             :             {
     623                 :             :                 ListCell   *opid;
     624                 :             : 
     625                 :             :                 /* OK, except user-defined operators are not allowed. */
     626   [ +  -  +  +  :          12 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
                   +  + ]
     627                 :             :                 {
     628         [ -  + ]:           8 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
     629                 :             :                     {
     630                 :           0 :                         errdetail_msg = _("User-defined operators are not allowed.");
     631                 :           0 :                         break;
     632                 :             :                     }
     633                 :             :                 }
     634                 :             :             }
     635                 :           4 :             break;
     636                 :         420 :         case T_Const:
     637                 :             :         case T_FuncExpr:
     638                 :             :         case T_BoolExpr:
     639                 :             :         case T_RelabelType:
     640                 :             :         case T_CollateExpr:
     641                 :             :         case T_CaseExpr:
     642                 :             :         case T_CaseTestExpr:
     643                 :             :         case T_ArrayExpr:
     644                 :             :         case T_RowExpr:
     645                 :             :         case T_CoalesceExpr:
     646                 :             :         case T_MinMaxExpr:
     647                 :             :         case T_XmlExpr:
     648                 :             :         case T_NullTest:
     649                 :             :         case T_BooleanTest:
     650                 :             :         case T_List:
     651                 :             :             /* OK, supported */
     652                 :         420 :             break;
     653                 :           4 :         default:
     654                 :           4 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
     655                 :           4 :             break;
     656                 :             :     }
     657                 :             : 
     658                 :             :     /*
     659                 :             :      * For all the supported nodes, if we haven't already found a problem,
     660                 :             :      * check the types, functions, and collations used in it.  We check List
     661                 :             :      * by walking through each element.
     662                 :             :      */
     663   [ +  +  +  + ]:         948 :     if (!errdetail_msg && !IsA(node, List))
     664                 :             :     {
     665         [ +  + ]:         900 :         if (exprType(node) >= FirstNormalObjectId)
     666                 :           4 :             errdetail_msg = _("User-defined types are not allowed.");
     667         [ +  + ]:         896 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
     668                 :             :                                          pstate))
     669                 :           8 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
     670   [ +  -  +  + ]:        1776 :         else if (exprCollation(node) >= FirstNormalObjectId ||
     671                 :         888 :                  exprInputCollation(node) >= FirstNormalObjectId)
     672                 :           4 :             errdetail_msg = _("User-defined collations are not allowed.");
     673                 :             :     }
     674                 :             : 
     675                 :             :     /*
     676                 :             :      * If we found a problem in this node, throw error now. Otherwise keep
     677                 :             :      * going.
     678                 :             :      */
     679         [ +  + ]:         948 :     if (errdetail_msg)
     680         [ +  - ]:          28 :         ereport(ERROR,
     681                 :             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     682                 :             :                  errmsg("invalid publication WHERE expression"),
     683                 :             :                  errdetail_internal("%s", errdetail_msg),
     684                 :             :                  parser_errposition(pstate, exprLocation(node))));
     685                 :             : 
     686                 :         920 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
     687                 :             :                                   pstate);
     688                 :             : }
     689                 :             : 
     690                 :             : /*
     691                 :             :  * Check if the row filter expression is a "simple expression".
     692                 :             :  *
     693                 :             :  * See check_simple_rowfilter_expr_walker for details.
     694                 :             :  */
     695                 :             : static bool
     696                 :         250 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
     697                 :             : {
     698                 :         250 :     return check_simple_rowfilter_expr_walker(node, pstate);
     699                 :             : }
     700                 :             : 
     701                 :             : /*
     702                 :             :  * Transform the publication WHERE expression for all the relations in the list,
     703                 :             :  * ensuring it is coerced to boolean and necessary collation information is
     704                 :             :  * added if required, and add a new nsitem/RTE for the associated relation to
     705                 :             :  * the ParseState's namespace list.
     706                 :             :  *
     707                 :             :  * Also check the publication row filter expression and throw an error if
     708                 :             :  * anything not permitted or unexpected is encountered.
     709                 :             :  */
     710                 :             : static void
     711                 :         772 : TransformPubWhereClauses(List *tables, const char *queryString,
     712                 :             :                          bool pubviaroot)
     713                 :             : {
     714                 :             :     ListCell   *lc;
     715                 :             : 
     716   [ +  +  +  +  :        1549 :     foreach(lc, tables)
                   +  + ]
     717                 :             :     {
     718                 :             :         ParseNamespaceItem *nsitem;
     719                 :         817 :         Node       *whereclause = NULL;
     720                 :             :         ParseState *pstate;
     721                 :         817 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     722                 :             : 
     723         [ +  + ]:         817 :         if (pri->whereClause == NULL)
     724                 :         555 :             continue;
     725                 :             : 
     726                 :             :         /*
     727                 :             :          * If the publication doesn't publish changes via the root partitioned
     728                 :             :          * table, the partition's row filter will be used. So disallow using
     729                 :             :          * WHERE clause on partitioned table in this case.
     730                 :             :          */
     731         [ +  + ]:         262 :         if (!pubviaroot &&
     732         [ +  + ]:         240 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     733         [ +  - ]:           4 :             ereport(ERROR,
     734                 :             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     735                 :             :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
     736                 :             :                             RelationGetRelationName(pri->relation)),
     737                 :             :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
     738                 :             :                                "publish_via_partition_root")));
     739                 :             : 
     740                 :             :         /*
     741                 :             :          * A fresh pstate is required so that we only have "this" table in its
     742                 :             :          * rangetable
     743                 :             :          */
     744                 :         258 :         pstate = make_parsestate(NULL);
     745                 :         258 :         pstate->p_sourcetext = queryString;
     746                 :         258 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
     747                 :             :                                                AccessShareLock, NULL,
     748                 :             :                                                false, false);
     749                 :         258 :         addNSItemToQuery(pstate, nsitem, false, true, true);
     750                 :             : 
     751                 :         258 :         whereclause = transformWhereClause(pstate,
     752                 :         258 :                                            copyObject(pri->whereClause),
     753                 :             :                                            EXPR_KIND_WHERE,
     754                 :             :                                            "PUBLICATION WHERE");
     755                 :             : 
     756                 :             :         /* Fix up collation information */
     757                 :         250 :         assign_expr_collations(pstate, whereclause);
     758                 :             : 
     759                 :         250 :         whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
     760                 :             : 
     761                 :             :         /*
     762                 :             :          * We allow only simple expressions in row filters. See
     763                 :             :          * check_simple_rowfilter_expr_walker.
     764                 :             :          */
     765                 :         250 :         check_simple_rowfilter_expr(whereclause, pstate);
     766                 :             : 
     767                 :         222 :         free_parsestate(pstate);
     768                 :             : 
     769                 :         222 :         pri->whereClause = whereclause;
     770                 :             :     }
     771                 :         732 : }
     772                 :             : 
     773                 :             : 
     774                 :             : /*
     775                 :             :  * Given a list of tables that are going to be added to a publication,
     776                 :             :  * verify that they fulfill the necessary preconditions, namely: no tables
     777                 :             :  * have a column list if any schema is published; and partitioned tables do
     778                 :             :  * not have column lists if publish_via_partition_root is not set.
     779                 :             :  *
     780                 :             :  * 'publish_schema' indicates that the publication contains any TABLES IN
     781                 :             :  * SCHEMA elements (newly added in this command, or preexisting).
     782                 :             :  * 'pubviaroot' is the value of publish_via_partition_root.
     783                 :             :  */
     784                 :             : static void
     785                 :         732 : CheckPubRelationColumnList(char *pubname, List *tables,
     786                 :             :                            bool publish_schema, bool pubviaroot)
     787                 :             : {
     788                 :             :     ListCell   *lc;
     789                 :             : 
     790   [ +  +  +  +  :        1489 :     foreach(lc, tables)
                   +  + ]
     791                 :             :     {
     792                 :         777 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     793                 :             : 
     794         [ +  + ]:         777 :         if (pri->columns == NIL)
     795                 :         517 :             continue;
     796                 :             : 
     797                 :             :         /*
     798                 :             :          * Disallow specifying column list if any schema is in the
     799                 :             :          * publication.
     800                 :             :          *
     801                 :             :          * XXX We could instead just forbid the case when the publication
     802                 :             :          * tries to publish the table with a column list and a schema for that
     803                 :             :          * table. However, if we do that then we need a restriction during
     804                 :             :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
     805                 :             :          * seem to be a good idea.
     806                 :             :          */
     807         [ +  + ]:         260 :         if (publish_schema)
     808         [ +  - ]:          16 :             ereport(ERROR,
     809                 :             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     810                 :             :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     811                 :             :                            get_namespace_name(RelationGetNamespace(pri->relation)),
     812                 :             :                            RelationGetRelationName(pri->relation), pubname),
     813                 :             :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
     814                 :             : 
     815                 :             :         /*
     816                 :             :          * If the publication doesn't publish changes via the root partitioned
     817                 :             :          * table, the partition's column list will be used. So disallow using
     818                 :             :          * a column list on the partitioned table in this case.
     819                 :             :          */
     820         [ +  + ]:         244 :         if (!pubviaroot &&
     821         [ +  + ]:         194 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     822         [ +  - ]:           4 :             ereport(ERROR,
     823                 :             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     824                 :             :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     825                 :             :                             get_namespace_name(RelationGetNamespace(pri->relation)),
     826                 :             :                             RelationGetRelationName(pri->relation), pubname),
     827                 :             :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
     828                 :             :                                "publish_via_partition_root")));
     829                 :             :     }
     830                 :         712 : }
     831                 :             : 
     832                 :             : /*
     833                 :             :  * Create new publication.
     834                 :             :  */
     835                 :             : ObjectAddress
     836                 :         635 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
     837                 :             : {
     838                 :             :     Relation    rel;
     839                 :             :     ObjectAddress myself;
     840                 :             :     Oid         puboid;
     841                 :             :     bool        nulls[Natts_pg_publication];
     842                 :             :     Datum       values[Natts_pg_publication];
     843                 :             :     HeapTuple   tup;
     844                 :             :     bool        publish_given;
     845                 :             :     PublicationActions pubactions;
     846                 :             :     bool        publish_via_partition_root_given;
     847                 :             :     bool        publish_via_partition_root;
     848                 :             :     bool        publish_generated_columns_given;
     849                 :             :     char        publish_generated_columns;
     850                 :             :     AclResult   aclresult;
     851                 :         635 :     List       *relations = NIL;
     852                 :         635 :     List       *exceptrelations = NIL;
     853                 :         635 :     List       *schemaidlist = NIL;
     854                 :             : 
     855                 :             :     /* must have CREATE privilege on database */
     856                 :         635 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
     857         [ +  + ]:         635 :     if (aclresult != ACLCHECK_OK)
     858                 :           4 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     859                 :           4 :                        get_database_name(MyDatabaseId));
     860                 :             : 
     861                 :             :     /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
     862         [ +  + ]:         631 :     if (!superuser())
     863                 :             :     {
     864   [ +  -  -  + ]:          15 :         if (stmt->for_all_tables || stmt->for_all_sequences)
     865         [ #  # ]:           0 :             ereport(ERROR,
     866                 :             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     867                 :             :                     errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
     868                 :             :     }
     869                 :             : 
     870                 :         631 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     871                 :             : 
     872                 :             :     /* Check if name is used */
     873                 :         631 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     874                 :             :                              CStringGetDatum(stmt->pubname));
     875         [ +  + ]:         631 :     if (OidIsValid(puboid))
     876         [ +  - ]:           4 :         ereport(ERROR,
     877                 :             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     878                 :             :                  errmsg("publication \"%s\" already exists",
     879                 :             :                         stmt->pubname)));
     880                 :             : 
     881                 :             :     /* Form a tuple. */
     882                 :         627 :     memset(values, 0, sizeof(values));
     883                 :         627 :     memset(nulls, false, sizeof(nulls));
     884                 :             : 
     885                 :         627 :     values[Anum_pg_publication_pubname - 1] =
     886                 :         627 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     887                 :         627 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     888                 :             : 
     889                 :         627 :     parse_publication_options(pstate,
     890                 :             :                               stmt->options,
     891                 :             :                               &publish_given, &pubactions,
     892                 :             :                               &publish_via_partition_root_given,
     893                 :             :                               &publish_via_partition_root,
     894                 :             :                               &publish_generated_columns_given,
     895                 :             :                               &publish_generated_columns);
     896                 :             : 
     897         [ +  + ]:         603 :     if (stmt->for_all_sequences &&
     898   [ +  +  +  -  :          17 :         (publish_given || publish_via_partition_root_given ||
                   -  + ]
     899                 :             :          publish_generated_columns_given))
     900         [ +  - ]:           1 :         ereport(NOTICE,
     901                 :             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     902                 :             :                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
     903                 :             : 
     904                 :         603 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     905                 :             :                                 Anum_pg_publication_oid);
     906                 :         603 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     907                 :         603 :     values[Anum_pg_publication_puballtables - 1] =
     908                 :         603 :         BoolGetDatum(stmt->for_all_tables);
     909                 :         603 :     values[Anum_pg_publication_puballsequences - 1] =
     910                 :         603 :         BoolGetDatum(stmt->for_all_sequences);
     911                 :         603 :     values[Anum_pg_publication_pubinsert - 1] =
     912                 :         603 :         BoolGetDatum(pubactions.pubinsert);
     913                 :         603 :     values[Anum_pg_publication_pubupdate - 1] =
     914                 :         603 :         BoolGetDatum(pubactions.pubupdate);
     915                 :         603 :     values[Anum_pg_publication_pubdelete - 1] =
     916                 :         603 :         BoolGetDatum(pubactions.pubdelete);
     917                 :         603 :     values[Anum_pg_publication_pubtruncate - 1] =
     918                 :         603 :         BoolGetDatum(pubactions.pubtruncate);
     919                 :         603 :     values[Anum_pg_publication_pubviaroot - 1] =
     920                 :         603 :         BoolGetDatum(publish_via_partition_root);
     921                 :         603 :     values[Anum_pg_publication_pubgencols - 1] =
     922                 :         603 :         CharGetDatum(publish_generated_columns);
     923                 :             : 
     924                 :         603 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     925                 :             : 
     926                 :             :     /* Insert tuple into catalog. */
     927                 :         603 :     CatalogTupleInsert(rel, tup);
     928                 :         603 :     heap_freetuple(tup);
     929                 :             : 
     930                 :         603 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     931                 :             : 
     932                 :         603 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     933                 :             : 
     934                 :             :     /* Make the changes visible. */
     935                 :         603 :     CommandCounterIncrement();
     936                 :             : 
     937                 :             :     /* Associate objects with the publication. */
     938                 :         603 :     ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
     939                 :             :                                &exceptrelations, &schemaidlist);
     940                 :             : 
     941         [ +  + ]:         591 :     if (stmt->for_all_tables)
     942                 :             :     {
     943                 :             :         /* Process EXCEPT table list */
     944         [ +  + ]:         116 :         if (exceptrelations != NIL)
     945                 :             :         {
     946                 :             :             List       *rels;
     947                 :             : 
     948                 :          43 :             rels = OpenTableList(exceptrelations);
     949                 :          43 :             PublicationAddTables(puboid, rels, true, NULL);
     950                 :          39 :             CloseTableList(rels);
     951                 :             :         }
     952                 :             : 
     953                 :             :         /*
     954                 :             :          * Invalidate relcache so that publication info is rebuilt. Sequences
     955                 :             :          * publication doesn't require invalidation, as replica identity
     956                 :             :          * checks don't apply to them.
     957                 :             :          */
     958                 :         112 :         CacheInvalidateRelcacheAll();
     959                 :             :     }
     960         [ +  + ]:         475 :     else if (!stmt->for_all_sequences)
     961                 :             :     {
     962                 :             :         /* FOR TABLES IN SCHEMA requires superuser */
     963   [ +  +  +  + ]:         463 :         if (schemaidlist != NIL && !superuser())
     964         [ +  - ]:           4 :             ereport(ERROR,
     965                 :             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     966                 :             :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
     967                 :             : 
     968         [ +  + ]:         459 :         if (relations != NIL)
     969                 :             :         {
     970                 :             :             List       *rels;
     971                 :             : 
     972                 :         310 :             rels = OpenTableList(relations);
     973                 :         290 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
     974                 :             :                                      publish_via_partition_root);
     975                 :             : 
     976                 :         274 :             CheckPubRelationColumnList(stmt->pubname, rels,
     977                 :             :                                        schemaidlist != NIL,
     978                 :             :                                        publish_via_partition_root);
     979                 :             : 
     980                 :         270 :             PublicationAddTables(puboid, rels, true, NULL);
     981                 :         249 :             CloseTableList(rels);
     982                 :             :         }
     983                 :             : 
     984         [ +  + ]:         398 :         if (schemaidlist != NIL)
     985                 :             :         {
     986                 :             :             /*
     987                 :             :              * Schema lock is held until the publication is created to prevent
     988                 :             :              * concurrent schema deletion.
     989                 :             :              */
     990                 :         104 :             LockSchemaList(schemaidlist);
     991                 :         104 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
     992                 :             :         }
     993                 :             :     }
     994                 :             : 
     995                 :         518 :     table_close(rel, RowExclusiveLock);
     996                 :             : 
     997         [ -  + ]:         518 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     998                 :             : 
     999                 :             :     /*
    1000                 :             :      * We don't need this warning message when wal_level >= 'replica' since
    1001                 :             :      * logical decoding is automatically enabled up on a logical slot
    1002                 :             :      * creation.
    1003                 :             :      */
    1004         [ +  + ]:         518 :     if (wal_level < WAL_LEVEL_REPLICA)
    1005         [ +  - ]:          16 :         ereport(WARNING,
    1006                 :             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1007                 :             :                  errmsg("logical decoding must be enabled to publish logical changes"),
    1008                 :             :                  errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
    1009                 :             : 
    1010                 :         518 :     return myself;
    1011                 :             : }
    1012                 :             : 
    1013                 :             : /*
    1014                 :             :  * Change options of a publication.
    1015                 :             :  */
    1016                 :             : static void
    1017                 :          84 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
    1018                 :             :                         Relation rel, HeapTuple tup)
    1019                 :             : {
    1020                 :             :     bool        nulls[Natts_pg_publication];
    1021                 :             :     bool        replaces[Natts_pg_publication];
    1022                 :             :     Datum       values[Natts_pg_publication];
    1023                 :             :     bool        publish_given;
    1024                 :             :     PublicationActions pubactions;
    1025                 :             :     bool        publish_via_partition_root_given;
    1026                 :             :     bool        publish_via_partition_root;
    1027                 :             :     bool        publish_generated_columns_given;
    1028                 :             :     char        publish_generated_columns;
    1029                 :             :     ObjectAddress obj;
    1030                 :             :     Form_pg_publication pubform;
    1031                 :          84 :     List       *root_relids = NIL;
    1032                 :             :     ListCell   *lc;
    1033                 :             : 
    1034                 :          84 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1035                 :             : 
    1036                 :          84 :     parse_publication_options(pstate,
    1037                 :             :                               stmt->options,
    1038                 :             :                               &publish_given, &pubactions,
    1039                 :             :                               &publish_via_partition_root_given,
    1040                 :             :                               &publish_via_partition_root,
    1041                 :             :                               &publish_generated_columns_given,
    1042                 :             :                               &publish_generated_columns);
    1043                 :             : 
    1044         [ +  + ]:          84 :     if (pubform->puballsequences &&
    1045   [ +  +  +  -  :           8 :         (publish_given || publish_via_partition_root_given ||
                   +  - ]
    1046                 :             :          publish_generated_columns_given))
    1047         [ +  - ]:           8 :         ereport(NOTICE,
    1048                 :             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1049                 :             :                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
    1050                 :             : 
    1051                 :             :     /*
    1052                 :             :      * If the publication doesn't publish changes via the root partitioned
    1053                 :             :      * table, the partition's row filter and column list will be used. So
    1054                 :             :      * disallow using WHERE clause and column lists on partitioned table in
    1055                 :             :      * this case.
    1056                 :             :      */
    1057   [ +  +  +  + ]:          84 :     if (!pubform->puballtables && publish_via_partition_root_given &&
    1058         [ +  + ]:          53 :         !publish_via_partition_root)
    1059                 :             :     {
    1060                 :             :         /*
    1061                 :             :          * Lock the publication so nobody else can do anything with it. This
    1062                 :             :          * prevents concurrent alter to add partitioned table(s) with WHERE
    1063                 :             :          * clause(s) and/or column lists which we don't allow when not
    1064                 :             :          * publishing via root.
    1065                 :             :          */
    1066                 :          32 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
    1067                 :             :                            AccessShareLock);
    1068                 :             : 
    1069                 :          32 :         root_relids = GetIncludedPublicationRelations(pubform->oid,
    1070                 :             :                                                       PUBLICATION_PART_ROOT);
    1071                 :             : 
    1072   [ +  -  +  +  :          56 :         foreach(lc, root_relids)
                   +  + ]
    1073                 :             :         {
    1074                 :          32 :             Oid         relid = lfirst_oid(lc);
    1075                 :             :             HeapTuple   rftuple;
    1076                 :             :             char        relkind;
    1077                 :             :             char       *relname;
    1078                 :             :             bool        has_rowfilter;
    1079                 :             :             bool        has_collist;
    1080                 :             : 
    1081                 :             :             /*
    1082                 :             :              * Beware: we don't have lock on the relations, so cope silently
    1083                 :             :              * with the cache lookups returning NULL.
    1084                 :             :              */
    1085                 :             : 
    1086                 :          32 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1087                 :             :                                       ObjectIdGetDatum(relid),
    1088                 :             :                                       ObjectIdGetDatum(pubform->oid));
    1089         [ -  + ]:          32 :             if (!HeapTupleIsValid(rftuple))
    1090                 :           0 :                 continue;
    1091                 :          32 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
    1092                 :          32 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
    1093   [ +  +  +  + ]:          32 :             if (!has_rowfilter && !has_collist)
    1094                 :             :             {
    1095                 :           8 :                 ReleaseSysCache(rftuple);
    1096                 :           8 :                 continue;
    1097                 :             :             }
    1098                 :             : 
    1099                 :          24 :             relkind = get_rel_relkind(relid);
    1100         [ +  + ]:          24 :             if (relkind != RELKIND_PARTITIONED_TABLE)
    1101                 :             :             {
    1102                 :          16 :                 ReleaseSysCache(rftuple);
    1103                 :          16 :                 continue;
    1104                 :             :             }
    1105                 :           8 :             relname = get_rel_name(relid);
    1106         [ -  + ]:           8 :             if (relname == NULL)    /* table concurrently dropped */
    1107                 :             :             {
    1108                 :           0 :                 ReleaseSysCache(rftuple);
    1109                 :           0 :                 continue;
    1110                 :             :             }
    1111                 :             : 
    1112         [ +  + ]:           8 :             if (has_rowfilter)
    1113         [ +  - ]:           4 :                 ereport(ERROR,
    1114                 :             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1115                 :             :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1116                 :             :                                 "publish_via_partition_root",
    1117                 :             :                                 stmt->pubname),
    1118                 :             :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1119                 :             :                                    relname, "publish_via_partition_root")));
    1120                 :             :             Assert(has_collist);
    1121         [ +  - ]:           4 :             ereport(ERROR,
    1122                 :             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1123                 :             :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1124                 :             :                             "publish_via_partition_root",
    1125                 :             :                             stmt->pubname),
    1126                 :             :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1127                 :             :                                relname, "publish_via_partition_root")));
    1128                 :             :         }
    1129                 :             :     }
    1130                 :             : 
    1131                 :             :     /* Everything ok, form a new tuple. */
    1132                 :          76 :     memset(values, 0, sizeof(values));
    1133                 :          76 :     memset(nulls, false, sizeof(nulls));
    1134                 :          76 :     memset(replaces, false, sizeof(replaces));
    1135                 :             : 
    1136         [ +  + ]:          76 :     if (publish_given)
    1137                 :             :     {
    1138                 :          22 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
    1139                 :          22 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
    1140                 :             : 
    1141                 :          22 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
    1142                 :          22 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
    1143                 :             : 
    1144                 :          22 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
    1145                 :          22 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
    1146                 :             : 
    1147                 :          22 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
    1148                 :          22 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
    1149                 :             :     }
    1150                 :             : 
    1151         [ +  + ]:          76 :     if (publish_via_partition_root_given)
    1152                 :             :     {
    1153                 :          46 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
    1154                 :          46 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
    1155                 :             :     }
    1156                 :             : 
    1157         [ +  + ]:          76 :     if (publish_generated_columns_given)
    1158                 :             :     {
    1159                 :           8 :         values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
    1160                 :           8 :         replaces[Anum_pg_publication_pubgencols - 1] = true;
    1161                 :             :     }
    1162                 :             : 
    1163                 :          76 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1164                 :             :                             replaces);
    1165                 :             : 
    1166                 :             :     /* Update the catalog. */
    1167                 :          76 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1168                 :             : 
    1169                 :          76 :     CommandCounterIncrement();
    1170                 :             : 
    1171                 :          76 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1172                 :             : 
    1173                 :             :     /* Invalidate the relcache. */
    1174         [ +  + ]:          76 :     if (pubform->puballtables)
    1175                 :             :     {
    1176                 :          13 :         CacheInvalidateRelcacheAll();
    1177                 :             :     }
    1178                 :             :     else
    1179                 :             :     {
    1180                 :          63 :         List       *relids = NIL;
    1181                 :          63 :         List       *schemarelids = NIL;
    1182                 :             : 
    1183                 :             :         /*
    1184                 :             :          * For any partitioned tables contained in the publication, we must
    1185                 :             :          * invalidate all partitions contained in the respective partition
    1186                 :             :          * trees, not just those explicitly mentioned in the publication.
    1187                 :             :          */
    1188         [ +  + ]:          63 :         if (root_relids == NIL)
    1189                 :          39 :             relids = GetIncludedPublicationRelations(pubform->oid,
    1190                 :             :                                                      PUBLICATION_PART_ALL);
    1191                 :             :         else
    1192                 :             :         {
    1193                 :             :             /*
    1194                 :             :              * We already got tables explicitly mentioned in the publication.
    1195                 :             :              * Now get all partitions for the partitioned table in the list.
    1196                 :             :              */
    1197   [ +  -  +  +  :          48 :             foreach(lc, root_relids)
                   +  + ]
    1198                 :          24 :                 relids = GetPubPartitionOptionRelations(relids,
    1199                 :             :                                                         PUBLICATION_PART_ALL,
    1200                 :             :                                                         lfirst_oid(lc));
    1201                 :             :         }
    1202                 :             : 
    1203                 :          63 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
    1204                 :             :                                                         PUBLICATION_PART_ALL);
    1205                 :          63 :         relids = list_concat_unique_oid(relids, schemarelids);
    1206                 :             : 
    1207                 :          63 :         InvalidatePublicationRels(relids);
    1208                 :             :     }
    1209                 :             : 
    1210                 :          76 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
    1211                 :          76 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1212                 :             :                                      (Node *) stmt);
    1213                 :             : 
    1214         [ -  + ]:          76 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
    1215                 :          76 : }
    1216                 :             : 
    1217                 :             : /*
    1218                 :             :  * Invalidate the relations.
    1219                 :             :  */
    1220                 :             : void
    1221                 :        1693 : InvalidatePublicationRels(List *relids)
    1222                 :             : {
    1223                 :             :     /*
    1224                 :             :      * We don't want to send too many individual messages, at some point it's
    1225                 :             :      * cheaper to just reset whole relcache.
    1226                 :             :      */
    1227         [ +  - ]:        1693 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
    1228                 :             :     {
    1229                 :             :         ListCell   *lc;
    1230                 :             : 
    1231   [ +  +  +  +  :       13969 :         foreach(lc, relids)
                   +  + ]
    1232                 :       12276 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
    1233                 :             :     }
    1234                 :             :     else
    1235                 :           0 :         CacheInvalidateRelcacheAll();
    1236                 :        1693 : }
    1237                 :             : 
    1238                 :             : /*
    1239                 :             :  * Add or remove table to/from publication.
    1240                 :             :  */
    1241                 :             : static void
    1242                 :         621 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
    1243                 :             :                        List *tables, const char *queryString,
    1244                 :             :                        bool publish_schema)
    1245                 :             : {
    1246                 :         621 :     List       *rels = NIL;
    1247                 :         621 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1248                 :         621 :     Oid         pubid = pubform->oid;
    1249                 :             : 
    1250                 :             :     /*
    1251                 :             :      * Nothing to do if no objects, except in SET: for that it is quite
    1252                 :             :      * possible that user has not specified any tables in which case we need
    1253                 :             :      * to remove all the existing tables.
    1254                 :             :      */
    1255   [ +  +  +  + ]:         621 :     if (!tables && stmt->action != AP_SetObjects)
    1256                 :          48 :         return;
    1257                 :             : 
    1258                 :         573 :     rels = OpenTableList(tables);
    1259                 :             : 
    1260         [ +  + ]:         573 :     if (stmt->action == AP_AddObjects)
    1261                 :             :     {
    1262                 :         186 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1263                 :             : 
    1264                 :         174 :         publish_schema |= is_schema_publication(pubid);
    1265                 :             : 
    1266                 :         174 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1267                 :         174 :                                    pubform->pubviaroot);
    1268                 :             : 
    1269                 :         166 :         PublicationAddTables(pubid, rels, false, stmt);
    1270                 :             :     }
    1271         [ +  + ]:         387 :     else if (stmt->action == AP_DropObjects)
    1272                 :          66 :         PublicationDropTables(pubid, rels, false);
    1273                 :             :     else                        /* AP_SetObjects */
    1274                 :             :     {
    1275                 :         321 :         List       *oldrelids = NIL;
    1276                 :         321 :         List       *delrels = NIL;
    1277                 :             :         ListCell   *oldlc;
    1278                 :             : 
    1279   [ +  +  +  + ]:         321 :         if (stmt->for_all_tables || stmt->for_all_sequences)
    1280                 :             :         {
    1281                 :             :             /*
    1282                 :             :              * In FOR ALL TABLES mode, relations are tracked as exclusions
    1283                 :             :              * (EXCEPT clause). Fetch the current excluded relations so they
    1284                 :             :              * can be reconciled with the specified EXCEPT list.
    1285                 :             :              *
    1286                 :             :              * This applies only if the existing publication is already
    1287                 :             :              * defined as FOR ALL TABLES; otherwise, there are no exclusion
    1288                 :             :              * entries to process.
    1289                 :             :              */
    1290         [ +  + ]:          25 :             if (pubform->puballtables)
    1291                 :             :             {
    1292                 :          21 :                 oldrelids = GetExcludedPublicationTables(pubid,
    1293                 :             :                                                          PUBLICATION_PART_ROOT);
    1294                 :             :             }
    1295                 :             :         }
    1296                 :             :         else
    1297                 :             :         {
    1298                 :         296 :             oldrelids = GetIncludedPublicationRelations(pubid,
    1299                 :             :                                                         PUBLICATION_PART_ROOT);
    1300                 :             : 
    1301                 :         296 :             TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1302                 :             : 
    1303                 :         284 :             CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1304                 :         284 :                                        pubform->pubviaroot);
    1305                 :             :         }
    1306                 :             : 
    1307                 :             :         /*
    1308                 :             :          * To recreate the relation list for the publication, look for
    1309                 :             :          * existing relations that do not need to be dropped.
    1310                 :             :          */
    1311   [ +  +  +  +  :         591 :         foreach(oldlc, oldrelids)
                   +  + ]
    1312                 :             :         {
    1313                 :         298 :             Oid         oldrelid = lfirst_oid(oldlc);
    1314                 :             :             ListCell   *newlc;
    1315                 :             :             PublicationRelInfo *oldrel;
    1316                 :         298 :             bool        found = false;
    1317                 :             :             HeapTuple   rftuple;
    1318                 :         298 :             Node       *oldrelwhereclause = NULL;
    1319                 :         298 :             Bitmapset  *oldcolumns = NULL;
    1320                 :             : 
    1321                 :             :             /* look up the cache for the old relmap */
    1322                 :         298 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1323                 :             :                                       ObjectIdGetDatum(oldrelid),
    1324                 :             :                                       ObjectIdGetDatum(pubid));
    1325                 :             : 
    1326                 :             :             /*
    1327                 :             :              * See if the existing relation currently has a WHERE clause or a
    1328                 :             :              * column list. We need to compare those too.
    1329                 :             :              */
    1330         [ +  - ]:         298 :             if (HeapTupleIsValid(rftuple))
    1331                 :             :             {
    1332                 :         298 :                 bool        isnull = true;
    1333                 :             :                 Datum       whereClauseDatum;
    1334                 :             :                 Datum       columnListDatum;
    1335                 :             : 
    1336                 :             :                 /* Load the WHERE clause for this table. */
    1337                 :         298 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1338                 :             :                                                    Anum_pg_publication_rel_prqual,
    1339                 :             :                                                    &isnull);
    1340         [ +  + ]:         298 :                 if (!isnull)
    1341                 :         139 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
    1342                 :             : 
    1343                 :             :                 /* Transform the int2vector column list to a bitmap. */
    1344                 :         298 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1345                 :             :                                                   Anum_pg_publication_rel_prattrs,
    1346                 :             :                                                   &isnull);
    1347                 :             : 
    1348         [ +  + ]:         298 :                 if (!isnull)
    1349                 :          90 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
    1350                 :             : 
    1351                 :         298 :                 ReleaseSysCache(rftuple);
    1352                 :             :             }
    1353                 :             : 
    1354   [ +  +  +  +  :         562 :             foreach(newlc, rels)
                   +  + ]
    1355                 :             :             {
    1356                 :             :                 PublicationRelInfo *newpubrel;
    1357                 :             :                 Oid         newrelid;
    1358                 :         290 :                 Bitmapset  *newcolumns = NULL;
    1359                 :             : 
    1360                 :         290 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
    1361                 :         290 :                 newrelid = RelationGetRelid(newpubrel->relation);
    1362                 :             : 
    1363                 :             :                 /*
    1364                 :             :                  * Validate the column list.  If the column list or WHERE
    1365                 :             :                  * clause changes, then the validation done here will be
    1366                 :             :                  * duplicated inside PublicationAddTables().  The validation
    1367                 :             :                  * is cheap enough that that seems harmless.
    1368                 :             :                  */
    1369                 :         290 :                 newcolumns = pub_collist_validate(newpubrel->relation,
    1370                 :             :                                                   newpubrel->columns);
    1371                 :             : 
    1372                 :             :                 /*
    1373                 :             :                  * Check if any of the new set of relations matches with the
    1374                 :             :                  * existing relations in the publication. Additionally, if the
    1375                 :             :                  * relation has an associated WHERE clause, check the WHERE
    1376                 :             :                  * expressions also match. Same for the column list. Drop the
    1377                 :             :                  * rest.
    1378                 :             :                  */
    1379         [ +  + ]:         282 :                 if (newrelid == oldrelid)
    1380                 :             :                 {
    1381   [ +  +  +  + ]:         202 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
    1382                 :          60 :                         bms_equal(oldcolumns, newcolumns))
    1383                 :             :                     {
    1384                 :          18 :                         found = true;
    1385                 :          18 :                         break;
    1386                 :             :                     }
    1387                 :             :                 }
    1388                 :             :             }
    1389                 :             : 
    1390                 :             :             /*
    1391                 :             :              * Add the non-matched relations to a list so that they can be
    1392                 :             :              * dropped.
    1393                 :             :              */
    1394         [ +  + ]:         290 :             if (!found)
    1395                 :             :             {
    1396                 :         272 :                 oldrel = palloc_object(PublicationRelInfo);
    1397                 :         272 :                 oldrel->whereClause = NULL;
    1398                 :         272 :                 oldrel->columns = NIL;
    1399                 :         272 :                 oldrel->except = false;
    1400                 :         272 :                 oldrel->relation = table_open(oldrelid,
    1401                 :             :                                               ShareUpdateExclusiveLock);
    1402                 :         272 :                 delrels = lappend(delrels, oldrel);
    1403                 :             :             }
    1404                 :             :         }
    1405                 :             : 
    1406                 :             :         /* And drop them. */
    1407                 :         293 :         PublicationDropTables(pubid, delrels, true);
    1408                 :             : 
    1409                 :             :         /*
    1410                 :             :          * Don't bother calculating the difference for adding, we'll catch and
    1411                 :             :          * skip existing ones when doing catalog update.
    1412                 :             :          */
    1413                 :         293 :         PublicationAddTables(pubid, rels, true, stmt);
    1414                 :             : 
    1415                 :         293 :         CloseTableList(delrels);
    1416                 :             :     }
    1417                 :             : 
    1418                 :         485 :     CloseTableList(rels);
    1419                 :             : }
    1420                 :             : 
    1421                 :             : /*
    1422                 :             :  * Alter the publication schemas.
    1423                 :             :  *
    1424                 :             :  * Add or remove schemas to/from publication.
    1425                 :             :  */
    1426                 :             : static void
    1427                 :         533 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
    1428                 :             :                         HeapTuple tup, List *schemaidlist)
    1429                 :             : {
    1430                 :         533 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1431                 :             : 
    1432                 :             :     /*
    1433                 :             :      * Nothing to do if no objects, except in SET: for that it is quite
    1434                 :             :      * possible that user has not specified any schemas in which case we need
    1435                 :             :      * to remove all the existing schemas.
    1436                 :             :      */
    1437   [ +  +  +  + ]:         533 :     if (!schemaidlist && stmt->action != AP_SetObjects)
    1438                 :         192 :         return;
    1439                 :             : 
    1440                 :             :     /*
    1441                 :             :      * Schema lock is held until the publication is altered to prevent
    1442                 :             :      * concurrent schema deletion.
    1443                 :             :      */
    1444                 :         341 :     LockSchemaList(schemaidlist);
    1445         [ +  + ]:         341 :     if (stmt->action == AP_AddObjects)
    1446                 :             :     {
    1447                 :             :         ListCell   *lc;
    1448                 :             :         List       *reloids;
    1449                 :             : 
    1450                 :          23 :         reloids = GetIncludedPublicationRelations(pubform->oid,
    1451                 :             :                                                   PUBLICATION_PART_ROOT);
    1452                 :             : 
    1453   [ +  +  +  +  :          32 :         foreach(lc, reloids)
                   +  + ]
    1454                 :             :         {
    1455                 :             :             HeapTuple   coltuple;
    1456                 :             : 
    1457                 :          13 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
    1458                 :             :                                        ObjectIdGetDatum(lfirst_oid(lc)),
    1459                 :             :                                        ObjectIdGetDatum(pubform->oid));
    1460                 :             : 
    1461         [ -  + ]:          13 :             if (!HeapTupleIsValid(coltuple))
    1462                 :           0 :                 continue;
    1463                 :             : 
    1464                 :             :             /*
    1465                 :             :              * Disallow adding schema if column list is already part of the
    1466                 :             :              * publication. See CheckPubRelationColumnList.
    1467                 :             :              */
    1468         [ +  + ]:          13 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
    1469         [ +  - ]:           4 :                 ereport(ERROR,
    1470                 :             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1471                 :             :                         errmsg("cannot add schema to publication \"%s\"",
    1472                 :             :                                stmt->pubname),
    1473                 :             :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
    1474                 :             : 
    1475                 :           9 :             ReleaseSysCache(coltuple);
    1476                 :             :         }
    1477                 :             : 
    1478                 :          19 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
    1479                 :             :     }
    1480         [ +  + ]:         318 :     else if (stmt->action == AP_DropObjects)
    1481                 :          25 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
    1482                 :             :     else                        /* AP_SetObjects */
    1483                 :             :     {
    1484                 :         293 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
    1485                 :         293 :         List       *delschemas = NIL;
    1486                 :             : 
    1487                 :             :         /* Identify which schemas should be dropped */
    1488                 :         293 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
    1489                 :             : 
    1490                 :             :         /*
    1491                 :             :          * Schema lock is held until the publication is altered to prevent
    1492                 :             :          * concurrent schema deletion.
    1493                 :             :          */
    1494                 :         293 :         LockSchemaList(delschemas);
    1495                 :             : 
    1496                 :             :         /* And drop them */
    1497                 :         293 :         PublicationDropSchemas(pubform->oid, delschemas, true);
    1498                 :             : 
    1499                 :             :         /*
    1500                 :             :          * Don't bother calculating the difference for adding, we'll catch and
    1501                 :             :          * skip existing ones when doing catalog update.
    1502                 :             :          */
    1503                 :         293 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
    1504                 :             :     }
    1505                 :             : }
    1506                 :             : 
    1507                 :             : /*
    1508                 :             :  * Check if relations and schemas can be in a given publication and throw
    1509                 :             :  * appropriate error if not.
    1510                 :             :  */
    1511                 :             : static void
    1512                 :         685 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
    1513                 :             :                       List *tables, List *schemaidlist)
    1514                 :             : {
    1515                 :         685 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1516                 :             : 
    1517   [ +  +  +  +  :         685 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
                   +  + ]
    1518         [ +  + ]:          67 :         schemaidlist && !superuser())
    1519         [ +  - ]:           4 :         ereport(ERROR,
    1520                 :             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1521                 :             :                  errmsg("must be superuser to add or set schemas")));
    1522                 :             : 
    1523   [ +  +  +  + ]:         681 :     if (stmt->for_all_tables && !superuser())
    1524         [ +  - ]:           8 :         ereport(ERROR,
    1525                 :             :                 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1526                 :             :                 errmsg("must be superuser to set ALL TABLES"));
    1527                 :             : 
    1528   [ +  +  +  + ]:         673 :     if (stmt->for_all_sequences && !superuser())
    1529         [ +  - ]:           4 :         ereport(ERROR,
    1530                 :             :                 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1531                 :             :                 errmsg("must be superuser to set ALL SEQUENCES"));
    1532                 :             : 
    1533                 :             :     /*
    1534                 :             :      * Check that user is allowed to manipulate the publication tables in
    1535                 :             :      * schema
    1536                 :             :      */
    1537   [ +  +  +  +  :         669 :     if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
                   -  + ]
    1538                 :             :     {
    1539   [ +  -  -  + ]:          12 :         if (pubform->puballtables && pubform->puballsequences)
    1540         [ #  # ]:           0 :             ereport(ERROR,
    1541                 :             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1542                 :             :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
    1543                 :             :                            NameStr(pubform->pubname)),
    1544                 :             :                     errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
    1545         [ +  - ]:          12 :         else if (pubform->puballtables)
    1546         [ +  - ]:          12 :             ereport(ERROR,
    1547                 :             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1548                 :             :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1549                 :             :                            NameStr(pubform->pubname)),
    1550                 :             :                     errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
    1551                 :             :         else
    1552         [ #  # ]:           0 :             ereport(ERROR,
    1553                 :             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1554                 :             :                     errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
    1555                 :             :                            NameStr(pubform->pubname)),
    1556                 :             :                     errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
    1557                 :             :     }
    1558                 :             : 
    1559                 :             :     /* Check that user is allowed to manipulate the publication tables. */
    1560   [ +  +  +  +  :         657 :     if (tables && (pubform->puballtables || pubform->puballsequences))
                   -  + ]
    1561                 :             :     {
    1562   [ +  -  -  + ]:          12 :         if (pubform->puballtables && pubform->puballsequences)
    1563         [ #  # ]:           0 :             ereport(ERROR,
    1564                 :             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1565                 :             :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
    1566                 :             :                            NameStr(pubform->pubname)),
    1567                 :             :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
    1568         [ +  - ]:          12 :         else if (pubform->puballtables)
    1569         [ +  - ]:          12 :             ereport(ERROR,
    1570                 :             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1571                 :             :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1572                 :             :                            NameStr(pubform->pubname)),
    1573                 :             :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
    1574                 :             :         else
    1575         [ #  # ]:           0 :             ereport(ERROR,
    1576                 :             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1577                 :             :                     errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
    1578                 :             :                            NameStr(pubform->pubname)),
    1579                 :             :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
    1580                 :             :     }
    1581                 :             : 
    1582   [ +  +  +  + ]:         645 :     if (stmt->for_all_tables || stmt->for_all_sequences)
    1583                 :             :     {
    1584                 :             :         /*
    1585                 :             :          * If the publication already contains specific tables or schemas, we
    1586                 :             :          * prevent switching to a ALL state.
    1587                 :             :          */
    1588   [ +  +  +  + ]:          86 :         if (is_table_publication(pubform->oid) ||
    1589                 :          37 :             is_schema_publication(pubform->oid))
    1590                 :             :         {
    1591   [ +  -  +  + ]:          24 :             ereport(ERROR,
    1592                 :             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1593                 :             :                     stmt->for_all_tables ?
    1594                 :             :                     errmsg("publication \"%s\" does not support ALL TABLES operations", NameStr(pubform->pubname)) :
    1595                 :             :                     errmsg("publication \"%s\" does not support ALL SEQUENCES operations", NameStr(pubform->pubname)),
    1596                 :             :                     errdetail("This operation requires the publication to be defined as FOR ALL TABLES/SEQUENCES or to be empty."));
    1597                 :             :         }
    1598                 :             :     }
    1599                 :         621 : }
    1600                 :             : 
    1601                 :             : /*
    1602                 :             :  * Update FOR ALL TABLES / FOR ALL SEQUENCES flags of a publication.
    1603                 :             :  */
    1604                 :             : static void
    1605                 :         521 : AlterPublicationAllFlags(AlterPublicationStmt *stmt, Relation rel,
    1606                 :             :                          HeapTuple tup)
    1607                 :             : {
    1608                 :             :     Form_pg_publication pubform;
    1609                 :         521 :     bool        nulls[Natts_pg_publication] = {0};
    1610                 :         521 :     bool        replaces[Natts_pg_publication] = {0};
    1611                 :         521 :     Datum       values[Natts_pg_publication] = {0};
    1612                 :         521 :     bool        dirty = false;
    1613                 :             : 
    1614   [ +  +  +  + ]:         521 :     if (!stmt->for_all_tables && !stmt->for_all_sequences)
    1615                 :         496 :         return;
    1616                 :             : 
    1617                 :          25 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1618                 :             : 
    1619                 :             :     /* Update FOR ALL TABLES flag if changed */
    1620         [ +  + ]:          25 :     if (stmt->for_all_tables != pubform->puballtables)
    1621                 :             :     {
    1622                 :           8 :         values[Anum_pg_publication_puballtables - 1] =
    1623                 :           8 :             BoolGetDatum(stmt->for_all_tables);
    1624                 :           8 :         replaces[Anum_pg_publication_puballtables - 1] = true;
    1625                 :           8 :         dirty = true;
    1626                 :             :     }
    1627                 :             : 
    1628                 :             :     /* Update FOR ALL SEQUENCES flag if changed */
    1629         [ +  + ]:          25 :     if (stmt->for_all_sequences != pubform->puballsequences)
    1630                 :             :     {
    1631                 :          12 :         values[Anum_pg_publication_puballsequences - 1] =
    1632                 :          12 :             BoolGetDatum(stmt->for_all_sequences);
    1633                 :          12 :         replaces[Anum_pg_publication_puballsequences - 1] = true;
    1634                 :          12 :         dirty = true;
    1635                 :             :     }
    1636                 :             : 
    1637         [ +  + ]:          25 :     if (dirty)
    1638                 :             :     {
    1639                 :          12 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values,
    1640                 :             :                                 nulls, replaces);
    1641                 :          12 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    1642                 :          12 :         CommandCounterIncrement();
    1643                 :             : 
    1644                 :             :         /* For ALL TABLES, we must invalidate all relcache entries */
    1645         [ +  + ]:          12 :         if (replaces[Anum_pg_publication_puballtables - 1])
    1646                 :           8 :             CacheInvalidateRelcacheAll();
    1647                 :             :     }
    1648                 :             : }
    1649                 :             : 
    1650                 :             : /*
    1651                 :             :  * Alter the existing publication.
    1652                 :             :  *
    1653                 :             :  * This is dispatcher function for AlterPublicationOptions,
    1654                 :             :  * AlterPublicationSchemas and AlterPublicationTables.
    1655                 :             :  */
    1656                 :             : void
    1657                 :         781 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
    1658                 :             : {
    1659                 :             :     Relation    rel;
    1660                 :             :     HeapTuple   tup;
    1661                 :             :     Form_pg_publication pubform;
    1662                 :             : 
    1663                 :         781 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1664                 :             : 
    1665                 :         781 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
    1666                 :             :                               CStringGetDatum(stmt->pubname));
    1667                 :             : 
    1668         [ -  + ]:         781 :     if (!HeapTupleIsValid(tup))
    1669         [ #  # ]:           0 :         ereport(ERROR,
    1670                 :             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1671                 :             :                  errmsg("publication \"%s\" does not exist",
    1672                 :             :                         stmt->pubname)));
    1673                 :             : 
    1674                 :         781 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1675                 :             : 
    1676                 :             :     /* must be owner */
    1677         [ -  + ]:         781 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
    1678                 :           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1679                 :           0 :                        stmt->pubname);
    1680                 :             : 
    1681         [ +  + ]:         781 :     if (stmt->options)
    1682                 :          84 :         AlterPublicationOptions(pstate, stmt, rel, tup);
    1683                 :             :     else
    1684                 :             :     {
    1685                 :         697 :         List       *relations = NIL;
    1686                 :         697 :         List       *exceptrelations = NIL;
    1687                 :         697 :         List       *schemaidlist = NIL;
    1688                 :         697 :         Oid         pubid = pubform->oid;
    1689                 :             : 
    1690                 :         697 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
    1691                 :             :                                    &exceptrelations, &schemaidlist);
    1692                 :             : 
    1693                 :         685 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
    1694                 :             : 
    1695                 :         621 :         heap_freetuple(tup);
    1696                 :             : 
    1697                 :             :         /* Lock the publication so nobody else can do anything with it. */
    1698                 :         621 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
    1699                 :             :                            AccessExclusiveLock);
    1700                 :             : 
    1701                 :             :         /*
    1702                 :             :          * It is possible that by the time we acquire the lock on publication,
    1703                 :             :          * concurrent DDL has removed it. We can test this by checking the
    1704                 :             :          * existence of publication. We get the tuple again to avoid the risk
    1705                 :             :          * of any publication option getting changed.
    1706                 :             :          */
    1707                 :         621 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1708         [ -  + ]:         621 :         if (!HeapTupleIsValid(tup))
    1709         [ #  # ]:           0 :             ereport(ERROR,
    1710                 :             :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1711                 :             :                     errmsg("publication \"%s\" does not exist",
    1712                 :             :                            stmt->pubname));
    1713                 :             : 
    1714                 :         621 :         relations = list_concat(relations, exceptrelations);
    1715                 :         621 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
    1716                 :             :                                schemaidlist != NIL);
    1717                 :         533 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
    1718                 :         521 :         AlterPublicationAllFlags(stmt, rel, tup);
    1719                 :             :     }
    1720                 :             : 
    1721                 :             :     /* Cleanup. */
    1722                 :         597 :     heap_freetuple(tup);
    1723                 :         597 :     table_close(rel, RowExclusiveLock);
    1724                 :         597 : }
    1725                 :             : 
    1726                 :             : /*
    1727                 :             :  * Remove relation from publication by mapping OID.
    1728                 :             :  */
    1729                 :             : void
    1730                 :         641 : RemovePublicationRelById(Oid proid)
    1731                 :             : {
    1732                 :             :     Relation    rel;
    1733                 :             :     HeapTuple   tup;
    1734                 :             :     Form_pg_publication_rel pubrel;
    1735                 :         641 :     List       *relids = NIL;
    1736                 :             : 
    1737                 :         641 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
    1738                 :             : 
    1739                 :         641 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
    1740                 :             : 
    1741         [ -  + ]:         641 :     if (!HeapTupleIsValid(tup))
    1742         [ #  # ]:           0 :         elog(ERROR, "cache lookup failed for publication table %u",
    1743                 :             :              proid);
    1744                 :             : 
    1745                 :         641 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
    1746                 :             : 
    1747                 :             :     /*
    1748                 :             :      * Invalidate relcache so that publication info is rebuilt.
    1749                 :             :      *
    1750                 :             :      * For the partitioned tables, we must invalidate all partitions contained
    1751                 :             :      * in the respective partition hierarchies, not just the one explicitly
    1752                 :             :      * mentioned in the publication. This is required because we implicitly
    1753                 :             :      * publish the child tables when the parent table is published.
    1754                 :             :      */
    1755                 :         641 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
    1756                 :             :                                             pubrel->prrelid);
    1757                 :             : 
    1758                 :         641 :     InvalidatePublicationRels(relids);
    1759                 :             : 
    1760                 :         641 :     CatalogTupleDelete(rel, &tup->t_self);
    1761                 :             : 
    1762                 :         641 :     ReleaseSysCache(tup);
    1763                 :             : 
    1764                 :         641 :     table_close(rel, RowExclusiveLock);
    1765                 :         641 : }
    1766                 :             : 
    1767                 :             : /*
    1768                 :             :  * Remove the publication by mapping OID.
    1769                 :             :  */
    1770                 :             : void
    1771                 :         392 : RemovePublicationById(Oid pubid)
    1772                 :             : {
    1773                 :             :     Relation    rel;
    1774                 :             :     HeapTuple   tup;
    1775                 :             :     Form_pg_publication pubform;
    1776                 :             : 
    1777                 :         392 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1778                 :             : 
    1779                 :         392 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1780         [ -  + ]:         392 :     if (!HeapTupleIsValid(tup))
    1781         [ #  # ]:           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1782                 :             : 
    1783                 :         392 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1784                 :             : 
    1785                 :             :     /* Invalidate relcache so that publication info is rebuilt. */
    1786         [ +  + ]:         392 :     if (pubform->puballtables)
    1787                 :          91 :         CacheInvalidateRelcacheAll();
    1788                 :             : 
    1789                 :         392 :     CatalogTupleDelete(rel, &tup->t_self);
    1790                 :             : 
    1791                 :         392 :     ReleaseSysCache(tup);
    1792                 :             : 
    1793                 :         392 :     table_close(rel, RowExclusiveLock);
    1794                 :         392 : }
    1795                 :             : 
    1796                 :             : /*
    1797                 :             :  * Remove schema from publication by mapping OID.
    1798                 :             :  */
    1799                 :             : void
    1800                 :         131 : RemovePublicationSchemaById(Oid psoid)
    1801                 :             : {
    1802                 :             :     Relation    rel;
    1803                 :             :     HeapTuple   tup;
    1804                 :         131 :     List       *schemaRels = NIL;
    1805                 :             :     Form_pg_publication_namespace pubsch;
    1806                 :             : 
    1807                 :         131 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
    1808                 :             : 
    1809                 :         131 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
    1810                 :             : 
    1811         [ -  + ]:         131 :     if (!HeapTupleIsValid(tup))
    1812         [ #  # ]:           0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
    1813                 :             : 
    1814                 :         131 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1815                 :             : 
    1816                 :             :     /*
    1817                 :             :      * Invalidate relcache so that publication info is rebuilt. See
    1818                 :             :      * RemovePublicationRelById for why we need to consider all the
    1819                 :             :      * partitions.
    1820                 :             :      */
    1821                 :         131 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
    1822                 :             :                                                PUBLICATION_PART_ALL);
    1823                 :         131 :     InvalidatePublicationRels(schemaRels);
    1824                 :             : 
    1825                 :         131 :     CatalogTupleDelete(rel, &tup->t_self);
    1826                 :             : 
    1827                 :         131 :     ReleaseSysCache(tup);
    1828                 :             : 
    1829                 :         131 :     table_close(rel, RowExclusiveLock);
    1830                 :         131 : }
    1831                 :             : 
    1832                 :             : /*
    1833                 :             :  * Open relations specified by a PublicationTable list.
    1834                 :             :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
    1835                 :             :  * add them to a publication.
    1836                 :             :  */
    1837                 :             : static List *
    1838                 :         926 : OpenTableList(List *tables)
    1839                 :             : {
    1840                 :         926 :     List       *relids = NIL;
    1841                 :         926 :     List       *rels = NIL;
    1842                 :             :     ListCell   *lc;
    1843                 :         926 :     List       *relids_with_rf = NIL;
    1844                 :         926 :     List       *relids_with_collist = NIL;
    1845                 :             : 
    1846                 :             :     /*
    1847                 :             :      * Open, share-lock, and check all the explicitly-specified relations
    1848                 :             :      */
    1849   [ +  +  +  +  :        1903 :     foreach(lc, tables)
                   +  + ]
    1850                 :             :     {
    1851                 :         997 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
    1852                 :         997 :         bool        recurse = t->relation->inh;
    1853                 :             :         Relation    rel;
    1854                 :             :         Oid         myrelid;
    1855                 :             :         PublicationRelInfo *pub_rel;
    1856                 :             : 
    1857                 :             :         /* Allow query cancel in case this takes a long time */
    1858         [ -  + ]:         997 :         CHECK_FOR_INTERRUPTS();
    1859                 :             : 
    1860                 :         997 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
    1861                 :         993 :         myrelid = RelationGetRelid(rel);
    1862                 :             : 
    1863                 :             :         /*
    1864                 :             :          * Filter out duplicates if user specifies "foo, foo".
    1865                 :             :          *
    1866                 :             :          * Note that this algorithm is known to not be very efficient (O(N^2))
    1867                 :             :          * but given that it only works on list of tables given to us by user
    1868                 :             :          * it's deemed acceptable.
    1869                 :             :          */
    1870         [ +  + ]:         993 :         if (list_member_oid(relids, myrelid))
    1871                 :             :         {
    1872                 :             :             /* Disallow duplicate tables if there are any with row filters. */
    1873   [ +  +  +  + ]:          16 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
    1874         [ +  - ]:           8 :                 ereport(ERROR,
    1875                 :             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1876                 :             :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1877                 :             :                                 RelationGetRelationName(rel))));
    1878                 :             : 
    1879                 :             :             /* Disallow duplicate tables if there are any with column lists. */
    1880   [ +  +  +  - ]:           8 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
    1881         [ +  - ]:           8 :                 ereport(ERROR,
    1882                 :             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1883                 :             :                          errmsg("conflicting or redundant column lists for table \"%s\"",
    1884                 :             :                                 RelationGetRelationName(rel))));
    1885                 :             : 
    1886                 :           0 :             table_close(rel, ShareUpdateExclusiveLock);
    1887                 :           0 :             continue;
    1888                 :             :         }
    1889                 :             : 
    1890                 :         977 :         pub_rel = palloc_object(PublicationRelInfo);
    1891                 :         977 :         pub_rel->relation = rel;
    1892                 :         977 :         pub_rel->whereClause = t->whereClause;
    1893                 :         977 :         pub_rel->columns = t->columns;
    1894                 :         977 :         pub_rel->except = t->except;
    1895                 :         977 :         rels = lappend(rels, pub_rel);
    1896                 :         977 :         relids = lappend_oid(relids, myrelid);
    1897                 :             : 
    1898         [ +  + ]:         977 :         if (t->whereClause)
    1899                 :         269 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
    1900                 :             : 
    1901         [ +  + ]:         977 :         if (t->columns)
    1902                 :         264 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
    1903                 :             : 
    1904                 :             :         /*
    1905                 :             :          * Add children of this rel, if requested, so that they too are added
    1906                 :             :          * to the publication.  A partitioned table can't have any inheritance
    1907                 :             :          * children other than its partitions, which need not be explicitly
    1908                 :             :          * added to the publication.
    1909                 :             :          */
    1910   [ +  +  +  + ]:         977 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
    1911                 :             :         {
    1912                 :             :             List       *children;
    1913                 :             :             ListCell   *child;
    1914                 :             : 
    1915                 :         805 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
    1916                 :             :                                            NULL);
    1917                 :             : 
    1918   [ +  -  +  +  :        1625 :             foreach(child, children)
                   +  + ]
    1919                 :             :             {
    1920                 :         820 :                 Oid         childrelid = lfirst_oid(child);
    1921                 :             : 
    1922                 :             :                 /* Allow query cancel in case this takes a long time */
    1923         [ -  + ]:         820 :                 CHECK_FOR_INTERRUPTS();
    1924                 :             : 
    1925                 :             :                 /*
    1926                 :             :                  * Skip duplicates if user specified both parent and child
    1927                 :             :                  * tables.
    1928                 :             :                  */
    1929         [ +  + ]:         820 :                 if (list_member_oid(relids, childrelid))
    1930                 :             :                 {
    1931                 :             :                     /*
    1932                 :             :                      * We don't allow to specify row filter for both parent
    1933                 :             :                      * and child table at the same time as it is not very
    1934                 :             :                      * clear which one should be given preference.
    1935                 :             :                      */
    1936         [ -  + ]:         805 :                     if (childrelid != myrelid &&
    1937   [ #  #  #  # ]:           0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
    1938         [ #  # ]:           0 :                         ereport(ERROR,
    1939                 :             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1940                 :             :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1941                 :             :                                         RelationGetRelationName(rel))));
    1942                 :             : 
    1943                 :             :                     /*
    1944                 :             :                      * We don't allow to specify column list for both parent
    1945                 :             :                      * and child table at the same time as it is not very
    1946                 :             :                      * clear which one should be given preference.
    1947                 :             :                      */
    1948         [ -  + ]:         805 :                     if (childrelid != myrelid &&
    1949   [ #  #  #  # ]:           0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
    1950         [ #  # ]:           0 :                         ereport(ERROR,
    1951                 :             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1952                 :             :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1953                 :             :                                         RelationGetRelationName(rel))));
    1954                 :             : 
    1955                 :         805 :                     continue;
    1956                 :             :                 }
    1957                 :             : 
    1958                 :             :                 /* find_all_inheritors already got lock */
    1959                 :          15 :                 rel = table_open(childrelid, NoLock);
    1960                 :          15 :                 pub_rel = palloc_object(PublicationRelInfo);
    1961                 :          15 :                 pub_rel->relation = rel;
    1962                 :             :                 /* child inherits WHERE clause from parent */
    1963                 :          15 :                 pub_rel->whereClause = t->whereClause;
    1964                 :             : 
    1965                 :             :                 /* child inherits column list from parent */
    1966                 :          15 :                 pub_rel->columns = t->columns;
    1967                 :          15 :                 pub_rel->except = t->except;
    1968                 :          15 :                 rels = lappend(rels, pub_rel);
    1969                 :          15 :                 relids = lappend_oid(relids, childrelid);
    1970                 :             : 
    1971         [ +  + ]:          15 :                 if (t->whereClause)
    1972                 :           1 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
    1973                 :             : 
    1974         [ -  + ]:          15 :                 if (t->columns)
    1975                 :           0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
    1976                 :             :             }
    1977                 :             :         }
    1978                 :             :     }
    1979                 :             : 
    1980                 :         906 :     list_free(relids);
    1981                 :         906 :     list_free(relids_with_rf);
    1982                 :             : 
    1983                 :         906 :     return rels;
    1984                 :             : }
    1985                 :             : 
    1986                 :             : /*
    1987                 :             :  * Close all relations in the list.
    1988                 :             :  */
    1989                 :             : static void
    1990                 :        1066 : CloseTableList(List *rels)
    1991                 :             : {
    1992                 :             :     ListCell   *lc;
    1993                 :             : 
    1994   [ +  +  +  +  :        2181 :     foreach(lc, rels)
                   +  + ]
    1995                 :             :     {
    1996                 :             :         PublicationRelInfo *pub_rel;
    1997                 :             : 
    1998                 :        1115 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
    1999                 :        1115 :         table_close(pub_rel->relation, NoLock);
    2000                 :             :     }
    2001                 :             : 
    2002                 :        1066 :     list_free_deep(rels);
    2003                 :        1066 : }
    2004                 :             : 
    2005                 :             : /*
    2006                 :             :  * Lock the schemas specified in the schema list in AccessShareLock mode in
    2007                 :             :  * order to prevent concurrent schema deletion.
    2008                 :             :  */
    2009                 :             : static void
    2010                 :         738 : LockSchemaList(List *schemalist)
    2011                 :             : {
    2012                 :             :     ListCell   *lc;
    2013                 :             : 
    2014   [ +  +  +  +  :         954 :     foreach(lc, schemalist)
                   +  + ]
    2015                 :             :     {
    2016                 :         216 :         Oid         schemaid = lfirst_oid(lc);
    2017                 :             : 
    2018                 :             :         /* Allow query cancel in case this takes a long time */
    2019         [ -  + ]:         216 :         CHECK_FOR_INTERRUPTS();
    2020                 :         216 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
    2021                 :             : 
    2022                 :             :         /*
    2023                 :             :          * It is possible that by the time we acquire the lock on schema,
    2024                 :             :          * concurrent DDL has removed it. We can test this by checking the
    2025                 :             :          * existence of schema.
    2026                 :             :          */
    2027         [ -  + ]:         216 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
    2028         [ #  # ]:           0 :             ereport(ERROR,
    2029                 :             :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
    2030                 :             :                     errmsg("schema with OID %u does not exist", schemaid));
    2031                 :             :     }
    2032                 :         738 : }
    2033                 :             : 
    2034                 :             : /*
    2035                 :             :  * Add listed tables to the publication.
    2036                 :             :  */
    2037                 :             : static void
    2038                 :         772 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
    2039                 :             :                      AlterPublicationStmt *stmt)
    2040                 :             : {
    2041                 :             :     ListCell   *lc;
    2042                 :             : 
    2043   [ +  +  +  +  :        1557 :     foreach(lc, rels)
                   +  + ]
    2044                 :             :     {
    2045                 :         838 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
    2046                 :         838 :         Relation    rel = pub_rel->relation;
    2047                 :             :         ObjectAddress obj;
    2048                 :             : 
    2049                 :             :         /* Must be owner of the table or superuser. */
    2050         [ +  + ]:         838 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
    2051                 :           4 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
    2052                 :           4 :                            RelationGetRelationName(rel));
    2053                 :             : 
    2054                 :         834 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists, stmt);
    2055         [ +  + ]:         785 :         if (stmt)
    2056                 :             :         {
    2057                 :         419 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    2058                 :             :                                              (Node *) stmt);
    2059                 :             : 
    2060         [ -  + ]:         419 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
    2061                 :             :                                        obj.objectId, 0);
    2062                 :             :         }
    2063                 :             :     }
    2064                 :         719 : }
    2065                 :             : 
    2066                 :             : /*
    2067                 :             :  * Remove listed tables from the publication.
    2068                 :             :  */
    2069                 :             : static void
    2070                 :         359 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
    2071                 :             : {
    2072                 :             :     ObjectAddress obj;
    2073                 :             :     ListCell   *lc;
    2074                 :             :     Oid         prid;
    2075                 :             : 
    2076   [ +  +  +  +  :         689 :     foreach(lc, rels)
                   +  + ]
    2077                 :             :     {
    2078                 :         342 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
    2079                 :         342 :         Relation    rel = pubrel->relation;
    2080                 :         342 :         Oid         relid = RelationGetRelid(rel);
    2081                 :             : 
    2082         [ -  + ]:         342 :         if (pubrel->columns)
    2083         [ #  # ]:           0 :             ereport(ERROR,
    2084                 :             :                     errcode(ERRCODE_SYNTAX_ERROR),
    2085                 :             :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
    2086                 :             : 
    2087                 :         342 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
    2088                 :             :                                ObjectIdGetDatum(relid),
    2089                 :             :                                ObjectIdGetDatum(pubid));
    2090         [ +  + ]:         342 :         if (!OidIsValid(prid))
    2091                 :             :         {
    2092         [ -  + ]:           8 :             if (missing_ok)
    2093                 :           0 :                 continue;
    2094                 :             : 
    2095         [ +  - ]:           8 :             ereport(ERROR,
    2096                 :             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2097                 :             :                      errmsg("relation \"%s\" is not part of the publication",
    2098                 :             :                             RelationGetRelationName(rel))));
    2099                 :             :         }
    2100                 :             : 
    2101         [ +  + ]:         334 :         if (pubrel->whereClause)
    2102         [ +  - ]:           4 :             ereport(ERROR,
    2103                 :             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2104                 :             :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
    2105                 :             : 
    2106                 :         330 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
    2107                 :         330 :         performDeletion(&obj, DROP_CASCADE, 0);
    2108                 :             :     }
    2109                 :         347 : }
    2110                 :             : 
    2111                 :             : /*
    2112                 :             :  * Add listed schemas to the publication.
    2113                 :             :  */
    2114                 :             : static void
    2115                 :         416 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
    2116                 :             :                       AlterPublicationStmt *stmt)
    2117                 :             : {
    2118                 :             :     ListCell   *lc;
    2119                 :             : 
    2120   [ +  +  +  +  :         583 :     foreach(lc, schemas)
                   +  + ]
    2121                 :             :     {
    2122                 :         175 :         Oid         schemaid = lfirst_oid(lc);
    2123                 :             :         ObjectAddress obj;
    2124                 :             : 
    2125                 :         175 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
    2126         [ +  + ]:         167 :         if (stmt)
    2127                 :             :         {
    2128                 :          43 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    2129                 :             :                                              (Node *) stmt);
    2130                 :             : 
    2131         [ -  + ]:          43 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
    2132                 :             :                                        obj.objectId, 0);
    2133                 :             :         }
    2134                 :             :     }
    2135                 :         408 : }
    2136                 :             : 
    2137                 :             : /*
    2138                 :             :  * Remove listed schemas from the publication.
    2139                 :             :  */
    2140                 :             : static void
    2141                 :         318 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
    2142                 :             : {
    2143                 :             :     ObjectAddress obj;
    2144                 :             :     ListCell   *lc;
    2145                 :             :     Oid         psid;
    2146                 :             : 
    2147   [ +  +  +  +  :         351 :     foreach(lc, schemas)
                   +  + ]
    2148                 :             :     {
    2149                 :          37 :         Oid         schemaid = lfirst_oid(lc);
    2150                 :             : 
    2151                 :          37 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
    2152                 :             :                                Anum_pg_publication_namespace_oid,
    2153                 :             :                                ObjectIdGetDatum(schemaid),
    2154                 :             :                                ObjectIdGetDatum(pubid));
    2155         [ +  + ]:          37 :         if (!OidIsValid(psid))
    2156                 :             :         {
    2157         [ -  + ]:           4 :             if (missing_ok)
    2158                 :           0 :                 continue;
    2159                 :             : 
    2160         [ +  - ]:           4 :             ereport(ERROR,
    2161                 :             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2162                 :             :                      errmsg("tables from schema \"%s\" are not part of the publication",
    2163                 :             :                             get_namespace_name(schemaid))));
    2164                 :             :         }
    2165                 :             : 
    2166                 :          33 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
    2167                 :          33 :         performDeletion(&obj, DROP_CASCADE, 0);
    2168                 :             :     }
    2169                 :         314 : }
    2170                 :             : 
    2171                 :             : /*
    2172                 :             :  * Internal workhorse for changing a publication owner
    2173                 :             :  */
    2174                 :             : static void
    2175                 :          26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2176                 :             : {
    2177                 :             :     Form_pg_publication form;
    2178                 :             : 
    2179                 :          26 :     form = (Form_pg_publication) GETSTRUCT(tup);
    2180                 :             : 
    2181         [ +  + ]:          26 :     if (form->pubowner == newOwnerId)
    2182                 :           6 :         return;
    2183                 :             : 
    2184         [ +  + ]:          20 :     if (!superuser())
    2185                 :             :     {
    2186                 :             :         AclResult   aclresult;
    2187                 :             : 
    2188                 :             :         /* Must be owner */
    2189         [ -  + ]:           8 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
    2190                 :           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    2191                 :           0 :                            NameStr(form->pubname));
    2192                 :             : 
    2193                 :             :         /* Must be able to become new owner */
    2194                 :           8 :         check_can_set_role(GetUserId(), newOwnerId);
    2195                 :             : 
    2196                 :             :         /* New owner must have CREATE privilege on database */
    2197                 :           8 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
    2198         [ -  + ]:           8 :         if (aclresult != ACLCHECK_OK)
    2199                 :           0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
    2200                 :           0 :                            get_database_name(MyDatabaseId));
    2201                 :             : 
    2202         [ +  + ]:           8 :         if (!superuser_arg(newOwnerId))
    2203                 :             :         {
    2204   [ +  -  +  -  :           8 :             if (form->puballtables || form->puballsequences ||
                   +  - ]
    2205                 :           4 :                 is_schema_publication(form->oid))
    2206         [ +  - ]:           4 :                 ereport(ERROR,
    2207                 :             :                         errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2208                 :             :                         errmsg("permission denied to change owner of publication \"%s\"",
    2209                 :             :                                NameStr(form->pubname)),
    2210                 :             :                         errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
    2211                 :             :         }
    2212                 :             :     }
    2213                 :             : 
    2214                 :          16 :     form->pubowner = newOwnerId;
    2215                 :          16 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2216                 :             : 
    2217                 :             :     /* Update owner dependency reference */
    2218                 :          16 :     changeDependencyOnOwner(PublicationRelationId,
    2219                 :             :                             form->oid,
    2220                 :             :                             newOwnerId);
    2221                 :             : 
    2222         [ -  + ]:          16 :     InvokeObjectPostAlterHook(PublicationRelationId,
    2223                 :             :                               form->oid, 0);
    2224                 :             : }
    2225                 :             : 
    2226                 :             : /*
    2227                 :             :  * Change publication owner -- by name
    2228                 :             :  */
    2229                 :             : ObjectAddress
    2230                 :          26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
    2231                 :             : {
    2232                 :             :     Oid         pubid;
    2233                 :             :     HeapTuple   tup;
    2234                 :             :     Relation    rel;
    2235                 :             :     ObjectAddress address;
    2236                 :             :     Form_pg_publication pubform;
    2237                 :             : 
    2238                 :          26 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2239                 :             : 
    2240                 :          26 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
    2241                 :             : 
    2242         [ -  + ]:          26 :     if (!HeapTupleIsValid(tup))
    2243         [ #  # ]:           0 :         ereport(ERROR,
    2244                 :             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2245                 :             :                  errmsg("publication \"%s\" does not exist", name)));
    2246                 :             : 
    2247                 :          26 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    2248                 :          26 :     pubid = pubform->oid;
    2249                 :             : 
    2250                 :          26 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2251                 :             : 
    2252                 :          22 :     ObjectAddressSet(address, PublicationRelationId, pubid);
    2253                 :             : 
    2254                 :          22 :     heap_freetuple(tup);
    2255                 :             : 
    2256                 :          22 :     table_close(rel, RowExclusiveLock);
    2257                 :             : 
    2258                 :          22 :     return address;
    2259                 :             : }
    2260                 :             : 
    2261                 :             : /*
    2262                 :             :  * Change publication owner -- by OID
    2263                 :             :  */
    2264                 :             : void
    2265                 :           0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
    2266                 :             : {
    2267                 :             :     HeapTuple   tup;
    2268                 :             :     Relation    rel;
    2269                 :             : 
    2270                 :           0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2271                 :             : 
    2272                 :           0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    2273                 :             : 
    2274         [ #  # ]:           0 :     if (!HeapTupleIsValid(tup))
    2275         [ #  # ]:           0 :         ereport(ERROR,
    2276                 :             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2277                 :             :                  errmsg("publication with OID %u does not exist", pubid)));
    2278                 :             : 
    2279                 :           0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2280                 :             : 
    2281                 :           0 :     heap_freetuple(tup);
    2282                 :             : 
    2283                 :           0 :     table_close(rel, RowExclusiveLock);
    2284                 :           0 : }
    2285                 :             : 
    2286                 :             : /*
    2287                 :             :  * Extract the publish_generated_columns option value from a DefElem. "stored"
    2288                 :             :  * and "none" values are accepted.
    2289                 :             :  */
    2290                 :             : static char
    2291                 :          49 : defGetGeneratedColsOption(DefElem *def)
    2292                 :             : {
    2293                 :          49 :     char       *sval = "";
    2294                 :             : 
    2295                 :             :     /*
    2296                 :             :      * A parameter value is required.
    2297                 :             :      */
    2298         [ +  + ]:          49 :     if (def->arg)
    2299                 :             :     {
    2300                 :          45 :         sval = defGetString(def);
    2301                 :             : 
    2302         [ +  + ]:          45 :         if (pg_strcasecmp(sval, "none") == 0)
    2303                 :          14 :             return PUBLISH_GENCOLS_NONE;
    2304         [ +  + ]:          31 :         if (pg_strcasecmp(sval, "stored") == 0)
    2305                 :          27 :             return PUBLISH_GENCOLS_STORED;
    2306                 :             :     }
    2307                 :             : 
    2308         [ +  - ]:           8 :     ereport(ERROR,
    2309                 :             :             errcode(ERRCODE_SYNTAX_ERROR),
    2310                 :             :             errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
    2311                 :             :             errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
    2312                 :             : 
    2313                 :             :     return PUBLISH_GENCOLS_NONE;    /* keep compiler quiet */
    2314                 :             : }
        

Generated by: LCOV version 2.0-1