LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 99.4 % 501 498
Test Date: 2026-03-22 10:16:18 Functions: 100.0 % 34 34
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_publication.c
       4              :  *      publication C API manipulation
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/catalog/pg_publication.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/genam.h"
      18              : #include "access/heapam.h"
      19              : #include "access/htup_details.h"
      20              : #include "access/tableam.h"
      21              : #include "catalog/catalog.h"
      22              : #include "catalog/dependency.h"
      23              : #include "catalog/indexing.h"
      24              : #include "catalog/namespace.h"
      25              : #include "catalog/objectaddress.h"
      26              : #include "catalog/partition.h"
      27              : #include "catalog/pg_inherits.h"
      28              : #include "catalog/pg_namespace.h"
      29              : #include "catalog/pg_publication.h"
      30              : #include "catalog/pg_publication_namespace.h"
      31              : #include "catalog/pg_publication_rel.h"
      32              : #include "catalog/pg_type.h"
      33              : #include "commands/publicationcmds.h"
      34              : #include "funcapi.h"
      35              : #include "utils/array.h"
      36              : #include "utils/builtins.h"
      37              : #include "utils/catcache.h"
      38              : #include "utils/fmgroids.h"
      39              : #include "utils/lsyscache.h"
      40              : #include "utils/rel.h"
      41              : #include "utils/syscache.h"
      42              : 
      43              : /* Records association between publication and published table */
      44              : typedef struct
      45              : {
      46              :     Oid         relid;          /* OID of published table */
      47              :     Oid         pubid;          /* OID of publication that publishes this
      48              :                                  * table. */
      49              : } published_rel;
      50              : 
      51              : /*
      52              :  * Check if relation can be in given publication and throws appropriate
      53              :  * error if not.
      54              :  */
      55              : static void
      56          754 : check_publication_add_relation(PublicationRelInfo *pri)
      57              : {
      58          754 :     Relation    targetrel = pri->relation;
      59              :     const char *errormsg;
      60              : 
      61          754 :     if (pri->except)
      62           53 :         errormsg = gettext_noop("cannot use publication EXCEPT clause for relation \"%s\"");
      63              :     else
      64          701 :         errormsg = gettext_noop("cannot add relation \"%s\" to publication");
      65              : 
      66              :     /* If in EXCEPT clause, must be root partitioned table */
      67          754 :     if (pri->except && targetrel->rd_rel->relispartition)
      68            4 :         ereport(ERROR,
      69              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      70              :                  errmsg(errormsg, RelationGetRelationName(targetrel)),
      71              :                  errdetail("This operation is not supported for individual partitions.")));
      72              : 
      73              :     /* Must be a regular or partitioned table */
      74          750 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
      75           95 :         RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
      76            9 :         ereport(ERROR,
      77              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      78              :                  errmsg(errormsg, RelationGetRelationName(targetrel)),
      79              :                  errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
      80              : 
      81              :     /* Can't be system table */
      82          741 :     if (IsCatalogRelation(targetrel))
      83            4 :         ereport(ERROR,
      84              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      85              :                  errmsg(errormsg, RelationGetRelationName(targetrel)),
      86              :                  errdetail("This operation is not supported for system tables.")));
      87              : 
      88              :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      89          737 :     if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
      90            4 :         ereport(ERROR,
      91              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      92              :                  errmsg(errormsg, RelationGetRelationName(targetrel)),
      93              :                  errdetail("This operation is not supported for temporary tables.")));
      94          733 :     else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
      95            4 :         ereport(ERROR,
      96              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      97              :                  errmsg(errormsg, RelationGetRelationName(targetrel)),
      98              :                  errdetail("This operation is not supported for unlogged tables.")));
      99          729 : }
     100              : 
     101              : /*
     102              :  * Check if schema can be in given publication and throw appropriate error if
     103              :  * not.
     104              :  */
     105              : static void
     106          159 : check_publication_add_schema(Oid schemaid)
     107              : {
     108              :     /* Can't be system namespace */
     109          159 :     if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
     110            4 :         ereport(ERROR,
     111              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     112              :                  errmsg("cannot add schema \"%s\" to publication",
     113              :                         get_namespace_name(schemaid)),
     114              :                  errdetail("This operation is not supported for system schemas.")));
     115              : 
     116              :     /* Can't be temporary namespace */
     117          155 :     if (isAnyTempNamespace(schemaid))
     118            0 :         ereport(ERROR,
     119              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     120              :                  errmsg("cannot add schema \"%s\" to publication",
     121              :                         get_namespace_name(schemaid)),
     122              :                  errdetail("Temporary schemas cannot be replicated.")));
     123          155 : }
     124              : 
     125              : /*
     126              :  * Returns if relation represented by oid and Form_pg_class entry
     127              :  * is publishable.
     128              :  *
     129              :  * Does same checks as check_publication_add_relation() above except for
     130              :  * RELKIND_SEQUENCE, but does not need relation to be opened and also does
     131              :  * not throw errors. Here, the additional check is to support ALL SEQUENCES
     132              :  * publication.
     133              :  *
     134              :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
     135              :  * ie all tables created during initdb.  This mainly affects the preinstalled
     136              :  * information_schema.  IsCatalogRelationOid() only excludes tables with
     137              :  * relid < FirstUnpinnedObjectId, making that test rather redundant,
     138              :  * but really we should get rid of the FirstNormalObjectId test not
     139              :  * IsCatalogRelationOid.  We can't do so today because we don't want
     140              :  * information_schema tables to be considered publishable; but this test
     141              :  * is really inadequate for that, since the information_schema could be
     142              :  * dropped and reloaded and then it'll be considered publishable.  The best
     143              :  * long-term solution may be to add a "relispublishable" bool to pg_class,
     144              :  * and depend on that instead of OID checks.
     145              :  */
     146              : static bool
     147       312857 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     148              : {
     149       321079 :     return (reltuple->relkind == RELKIND_RELATION ||
     150         8222 :             reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
     151         7216 :             reltuple->relkind == RELKIND_SEQUENCE) &&
     152       307012 :         !IsCatalogRelationOid(relid) &&
     153       625714 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     154              :         relid >= FirstNormalObjectId;
     155              : }
     156              : 
     157              : /*
     158              :  * Another variant of is_publishable_class(), taking a Relation.
     159              :  */
     160              : bool
     161       275886 : is_publishable_relation(Relation rel)
     162              : {
     163       275886 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     164              : }
     165              : 
     166              : /*
     167              :  * SQL-callable variant of the above
     168              :  *
     169              :  * This returns null when the relation does not exist.  This is intended to be
     170              :  * used for example in psql to avoid gratuitous errors when there are
     171              :  * concurrent catalog changes.
     172              :  */
     173              : Datum
     174         4272 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     175              : {
     176         4272 :     Oid         relid = PG_GETARG_OID(0);
     177              :     HeapTuple   tuple;
     178              :     bool        result;
     179              : 
     180         4272 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     181         4272 :     if (!HeapTupleIsValid(tuple))
     182            0 :         PG_RETURN_NULL();
     183         4272 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     184         4272 :     ReleaseSysCache(tuple);
     185         4272 :     PG_RETURN_BOOL(result);
     186              : }
     187              : 
     188              : /*
     189              :  * Returns true if the ancestor is in the list of published relations.
     190              :  * Otherwise, returns false.
     191              :  */
     192              : static bool
     193          107 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
     194              : {
     195              :     ListCell   *lc;
     196              : 
     197          355 :     foreach(lc, table_infos)
     198              :     {
     199          292 :         Oid         relid = ((published_rel *) lfirst(lc))->relid;
     200              : 
     201          292 :         if (relid == ancestor)
     202           44 :             return true;
     203              :     }
     204              : 
     205           63 :     return false;
     206              : }
     207              : 
     208              : /*
     209              :  * Filter out the partitions whose parent tables are also present in the list.
     210              :  */
     211              : static void
     212          185 : filter_partitions(List *table_infos)
     213              : {
     214              :     ListCell   *lc;
     215              : 
     216          539 :     foreach(lc, table_infos)
     217              :     {
     218          354 :         bool        skip = false;
     219          354 :         List       *ancestors = NIL;
     220              :         ListCell   *lc2;
     221          354 :         published_rel *table_info = (published_rel *) lfirst(lc);
     222              : 
     223          354 :         if (get_rel_relispartition(table_info->relid))
     224          107 :             ancestors = get_partition_ancestors(table_info->relid);
     225              : 
     226          417 :         foreach(lc2, ancestors)
     227              :         {
     228          107 :             Oid         ancestor = lfirst_oid(lc2);
     229              : 
     230          107 :             if (is_ancestor_member_tableinfos(ancestor, table_infos))
     231              :             {
     232           44 :                 skip = true;
     233           44 :                 break;
     234              :             }
     235              :         }
     236              : 
     237          354 :         if (skip)
     238           44 :             table_infos = foreach_delete_current(table_infos, lc);
     239              :     }
     240          185 : }
     241              : 
     242              : /*
     243              :  * Returns true if any schema is associated with the publication, false if no
     244              :  * schema is associated with the publication.
     245              :  */
     246              : bool
     247          211 : is_schema_publication(Oid pubid)
     248              : {
     249              :     Relation    pubschsrel;
     250              :     ScanKeyData scankey;
     251              :     SysScanDesc scan;
     252              :     HeapTuple   tup;
     253          211 :     bool        result = false;
     254              : 
     255          211 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     256          211 :     ScanKeyInit(&scankey,
     257              :                 Anum_pg_publication_namespace_pnpubid,
     258              :                 BTEqualStrategyNumber, F_OIDEQ,
     259              :                 ObjectIdGetDatum(pubid));
     260              : 
     261          211 :     scan = systable_beginscan(pubschsrel,
     262              :                               PublicationNamespacePnnspidPnpubidIndexId,
     263              :                               true, NULL, 1, &scankey);
     264          211 :     tup = systable_getnext(scan);
     265          211 :     result = HeapTupleIsValid(tup);
     266              : 
     267          211 :     systable_endscan(scan);
     268          211 :     table_close(pubschsrel, AccessShareLock);
     269              : 
     270          211 :     return result;
     271              : }
     272              : 
     273              : /*
     274              :  * Returns true if the publication has explicitly included relation (i.e.,
     275              :  * not marked as EXCEPT).
     276              :  */
     277              : bool
     278           45 : is_table_publication(Oid pubid)
     279              : {
     280              :     Relation    pubrelsrel;
     281              :     ScanKeyData scankey;
     282              :     SysScanDesc scan;
     283              :     HeapTuple   tup;
     284           45 :     bool        result = false;
     285              : 
     286           45 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     287           45 :     ScanKeyInit(&scankey,
     288              :                 Anum_pg_publication_rel_prpubid,
     289              :                 BTEqualStrategyNumber, F_OIDEQ,
     290              :                 ObjectIdGetDatum(pubid));
     291              : 
     292           45 :     scan = systable_beginscan(pubrelsrel,
     293              :                               PublicationRelPrpubidIndexId,
     294              :                               true, NULL, 1, &scankey);
     295           45 :     tup = systable_getnext(scan);
     296           45 :     if (HeapTupleIsValid(tup))
     297              :     {
     298              :         Form_pg_publication_rel pubrel;
     299              : 
     300           21 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     301              : 
     302              :         /*
     303              :          * For any publication, pg_publication_rel contains either only EXCEPT
     304              :          * entries or only explicitly included tables. Therefore, examining
     305              :          * the first tuple is sufficient to determine table inclusion.
     306              :          */
     307           21 :         result = !pubrel->prexcept;
     308              :     }
     309              : 
     310           45 :     systable_endscan(scan);
     311           45 :     table_close(pubrelsrel, AccessShareLock);
     312              : 
     313           45 :     return result;
     314              : }
     315              : 
     316              : /*
     317              :  * Returns true if the relation has column list associated with the
     318              :  * publication, false otherwise.
     319              :  *
     320              :  * If a column list is found, the corresponding bitmap is returned through the
     321              :  * cols parameter, if provided. The bitmap is constructed within the given
     322              :  * memory context (mcxt).
     323              :  */
     324              : bool
     325          917 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
     326              :                             Bitmapset **cols)
     327              : {
     328              :     HeapTuple   cftuple;
     329          917 :     bool        found = false;
     330              : 
     331          917 :     if (pub->alltables)
     332          206 :         return false;
     333              : 
     334          711 :     cftuple = SearchSysCache2(PUBLICATIONRELMAP,
     335              :                               ObjectIdGetDatum(relid),
     336              :                               ObjectIdGetDatum(pub->oid));
     337          711 :     if (HeapTupleIsValid(cftuple))
     338              :     {
     339              :         Datum       cfdatum;
     340              :         bool        isnull;
     341              : 
     342              :         /* Lookup the column list attribute. */
     343          653 :         cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
     344              :                                   Anum_pg_publication_rel_prattrs, &isnull);
     345              : 
     346              :         /* Was a column list found? */
     347          653 :         if (!isnull)
     348              :         {
     349              :             /* Build the column list bitmap in the given memory context. */
     350          193 :             if (cols)
     351          190 :                 *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
     352              : 
     353          193 :             found = true;
     354              :         }
     355              : 
     356          653 :         ReleaseSysCache(cftuple);
     357              :     }
     358              : 
     359          711 :     return found;
     360              : }
     361              : 
     362              : /*
     363              :  * Gets the relations based on the publication partition option for a specified
     364              :  * relation.
     365              :  */
     366              : List *
     367         3503 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
     368              :                                Oid relid)
     369              : {
     370         3503 :     if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
     371              :         pub_partopt != PUBLICATION_PART_ROOT)
     372          755 :     {
     373          755 :         List       *all_parts = find_all_inheritors(relid, NoLock,
     374              :                                                     NULL);
     375              : 
     376          755 :         if (pub_partopt == PUBLICATION_PART_ALL)
     377          650 :             result = list_concat(result, all_parts);
     378          105 :         else if (pub_partopt == PUBLICATION_PART_LEAF)
     379              :         {
     380              :             ListCell   *lc;
     381              : 
     382          387 :             foreach(lc, all_parts)
     383              :             {
     384          282 :                 Oid         partOid = lfirst_oid(lc);
     385              : 
     386          282 :                 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
     387          174 :                     result = lappend_oid(result, partOid);
     388              :             }
     389              :         }
     390              :         else
     391              :             Assert(false);
     392              :     }
     393              :     else
     394         2748 :         result = lappend_oid(result, relid);
     395              : 
     396         3503 :     return result;
     397              : }
     398              : 
     399              : /*
     400              :  * Returns the relid of the topmost ancestor that is published via this
     401              :  * publication if any and set its ancestor level to ancestor_level,
     402              :  * otherwise returns InvalidOid.
     403              :  *
     404              :  * The ancestor_level value allows us to compare the results for multiple
     405              :  * publications, and decide which value is higher up.
     406              :  *
     407              :  * Note that the list of ancestors should be ordered such that the topmost
     408              :  * ancestor is at the end of the list.
     409              :  */
     410              : Oid
     411          286 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
     412              : {
     413              :     ListCell   *lc;
     414          286 :     Oid         topmost_relid = InvalidOid;
     415          286 :     int         level = 0;
     416              : 
     417              :     /*
     418              :      * Find the "topmost" ancestor that is in this publication.
     419              :      */
     420          580 :     foreach(lc, ancestors)
     421              :     {
     422          294 :         Oid         ancestor = lfirst_oid(lc);
     423          294 :         List       *apubids = GetRelationIncludedPublications(ancestor);
     424          294 :         List       *aschemaPubids = NIL;
     425              : 
     426          294 :         level++;
     427              : 
     428          294 :         if (list_member_oid(apubids, puboid))
     429              :         {
     430          187 :             topmost_relid = ancestor;
     431              : 
     432          187 :             if (ancestor_level)
     433           43 :                 *ancestor_level = level;
     434              :         }
     435              :         else
     436              :         {
     437          107 :             aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
     438          107 :             if (list_member_oid(aschemaPubids, puboid))
     439              :             {
     440            5 :                 topmost_relid = ancestor;
     441              : 
     442            5 :                 if (ancestor_level)
     443            5 :                     *ancestor_level = level;
     444              :             }
     445              :         }
     446              : 
     447          294 :         list_free(apubids);
     448          294 :         list_free(aschemaPubids);
     449              :     }
     450              : 
     451          286 :     return topmost_relid;
     452              : }
     453              : 
     454              : /*
     455              :  * attnumstoint2vector
     456              :  *      Convert a Bitmapset of AttrNumbers into an int2vector.
     457              :  *
     458              :  * AttrNumber numbers are 0-based, i.e., not offset by
     459              :  * FirstLowInvalidHeapAttributeNumber.
     460              :  */
     461              : static int2vector *
     462          208 : attnumstoint2vector(Bitmapset *attrs)
     463              : {
     464              :     int2vector *result;
     465          208 :     int         n = bms_num_members(attrs);
     466          208 :     int         i = -1;
     467          208 :     int         j = 0;
     468              : 
     469          208 :     result = buildint2vector(NULL, n);
     470              : 
     471          565 :     while ((i = bms_next_member(attrs, i)) >= 0)
     472              :     {
     473              :         Assert(i <= PG_INT16_MAX);
     474              : 
     475          357 :         result->values[j++] = (int16) i;
     476              :     }
     477              : 
     478          208 :     return result;
     479              : }
     480              : 
     481              : /*
     482              :  * Insert new publication / relation mapping.
     483              :  */
     484              : ObjectAddress
     485          772 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
     486              :                          bool if_not_exists, AlterPublicationStmt *alter_stmt)
     487              : {
     488              :     Relation    rel;
     489              :     HeapTuple   tup;
     490              :     Datum       values[Natts_pg_publication_rel];
     491              :     bool        nulls[Natts_pg_publication_rel];
     492          772 :     Relation    targetrel = pri->relation;
     493          772 :     Oid         relid = RelationGetRelid(targetrel);
     494              :     Oid         pubreloid;
     495              :     Bitmapset  *attnums;
     496          772 :     Publication *pub = GetPublication(pubid);
     497              :     ObjectAddress myself,
     498              :                 referenced;
     499          772 :     List       *relids = NIL;
     500              :     int         i;
     501              :     bool        inval_except_table;
     502              : 
     503          772 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     504              : 
     505              :     /*
     506              :      * Check for duplicates. Note that this does not really prevent
     507              :      * duplicates, it's here just to provide nicer error message in common
     508              :      * case. The real protection is the unique key on the catalog.
     509              :      */
     510          772 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     511              :                               ObjectIdGetDatum(pubid)))
     512              :     {
     513           18 :         table_close(rel, RowExclusiveLock);
     514              : 
     515           18 :         if (if_not_exists)
     516           14 :             return InvalidObjectAddress;
     517              : 
     518            4 :         ereport(ERROR,
     519              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     520              :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     521              :                         RelationGetRelationName(targetrel), pub->name)));
     522              :     }
     523              : 
     524          754 :     check_publication_add_relation(pri);
     525              : 
     526              :     /* Validate and translate column names into a Bitmapset of attnums. */
     527          729 :     attnums = pub_collist_validate(pri->relation, pri->columns);
     528              : 
     529              :     /* Form a tuple. */
     530          713 :     memset(values, 0, sizeof(values));
     531          713 :     memset(nulls, false, sizeof(nulls));
     532              : 
     533          713 :     pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     534              :                                    Anum_pg_publication_rel_oid);
     535          713 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
     536          713 :     values[Anum_pg_publication_rel_prpubid - 1] =
     537          713 :         ObjectIdGetDatum(pubid);
     538          713 :     values[Anum_pg_publication_rel_prrelid - 1] =
     539          713 :         ObjectIdGetDatum(relid);
     540          713 :     values[Anum_pg_publication_rel_prexcept - 1] =
     541          713 :         BoolGetDatum(pri->except);
     542              : 
     543              :     /* Add qualifications, if available */
     544          713 :     if (pri->whereClause != NULL)
     545          211 :         values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
     546              :     else
     547          502 :         nulls[Anum_pg_publication_rel_prqual - 1] = true;
     548              : 
     549              :     /* Add column list, if available */
     550          713 :     if (pri->columns)
     551          208 :         values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
     552              :     else
     553          505 :         nulls[Anum_pg_publication_rel_prattrs - 1] = true;
     554              : 
     555          713 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     556              : 
     557              :     /* Insert tuple into catalog. */
     558          713 :     CatalogTupleInsert(rel, tup);
     559          713 :     heap_freetuple(tup);
     560              : 
     561              :     /* Register dependencies as needed */
     562          713 :     ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
     563              : 
     564              :     /* Add dependency on the publication */
     565          713 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     566          713 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     567              : 
     568              :     /* Add dependency on the relation */
     569          713 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     570          713 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     571              : 
     572              :     /* Add dependency on the objects mentioned in the qualifications */
     573          713 :     if (pri->whereClause)
     574          211 :         recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
     575              :                                         DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
     576              :                                         false);
     577              : 
     578              :     /* Add dependency on the columns, if any are listed */
     579          713 :     i = -1;
     580         1070 :     while ((i = bms_next_member(attnums, i)) >= 0)
     581              :     {
     582          357 :         ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
     583          357 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     584              :     }
     585              : 
     586              :     /* Close the table. */
     587          713 :     table_close(rel, RowExclusiveLock);
     588              : 
     589              :     /*
     590              :      * Determine whether EXCEPT tables require explicit relcache invalidation.
     591              :      *
     592              :      * For CREATE PUBLICATION with EXCEPT tables, invalidation is skipped
     593              :      * here, as CreatePublication() function invalidates all relations as part
     594              :      * of defining a FOR ALL TABLES publication.
     595              :      *
     596              :      * For ALTER PUBLICATION, invalidation is needed only when adding an
     597              :      * EXCEPT table to a publication already marked as ALL TABLES. For
     598              :      * publications that were originally empty or defined as ALL SEQUENCES and
     599              :      * are being converted to ALL TABLES, invalidation is skipped here, as
     600              :      * AlterPublicationAllFlags() function invalidates all relations while
     601              :      * marking the publication as ALL TABLES publication.
     602              :      */
     603          714 :     inval_except_table = (alter_stmt != NULL) && pub->alltables &&
     604            1 :         (alter_stmt->for_all_tables && pri->except);
     605              : 
     606          713 :     if (!pri->except || inval_except_table)
     607              :     {
     608              :         /*
     609              :          * Invalidate relcache so that publication info is rebuilt.
     610              :          *
     611              :          * For the partitioned tables, we must invalidate all partitions
     612              :          * contained in the respective partition hierarchies, not just the one
     613              :          * explicitly mentioned in the publication. This is required because
     614              :          * we implicitly publish the child tables when the parent table is
     615              :          * published.
     616              :          */
     617          665 :         relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
     618              :                                                 relid);
     619              : 
     620          665 :         InvalidatePublicationRels(relids);
     621              :     }
     622              : 
     623          713 :     return myself;
     624              : }
     625              : 
     626              : /*
     627              :  * pub_collist_validate
     628              :  *      Process and validate the 'columns' list and ensure the columns are all
     629              :  *      valid to use for a publication.  Checks for and raises an ERROR for
     630              :  *      any unknown columns, system columns, duplicate columns, or virtual
     631              :  *      generated columns.
     632              :  *
     633              :  * Looks up each column's attnum and returns a 0-based Bitmapset of the
     634              :  * corresponding attnums.
     635              :  */
     636              : Bitmapset *
     637         1007 : pub_collist_validate(Relation targetrel, List *columns)
     638              : {
     639         1007 :     Bitmapset  *set = NULL;
     640              :     ListCell   *lc;
     641         1007 :     TupleDesc   tupdesc = RelationGetDescr(targetrel);
     642              : 
     643         1544 :     foreach(lc, columns)
     644              :     {
     645          561 :         char       *colname = strVal(lfirst(lc));
     646          561 :         AttrNumber  attnum = get_attnum(RelationGetRelid(targetrel), colname);
     647              : 
     648          561 :         if (attnum == InvalidAttrNumber)
     649            4 :             ereport(ERROR,
     650              :                     errcode(ERRCODE_UNDEFINED_COLUMN),
     651              :                     errmsg("column \"%s\" of relation \"%s\" does not exist",
     652              :                            colname, RelationGetRelationName(targetrel)));
     653              : 
     654          557 :         if (!AttrNumberIsForUserDefinedAttr(attnum))
     655            8 :             ereport(ERROR,
     656              :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     657              :                     errmsg("cannot use system column \"%s\" in publication column list",
     658              :                            colname));
     659              : 
     660          549 :         if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     661            4 :             ereport(ERROR,
     662              :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     663              :                     errmsg("cannot use virtual generated column \"%s\" in publication column list",
     664              :                            colname));
     665              : 
     666          545 :         if (bms_is_member(attnum, set))
     667            8 :             ereport(ERROR,
     668              :                     errcode(ERRCODE_DUPLICATE_OBJECT),
     669              :                     errmsg("duplicate column \"%s\" in publication column list",
     670              :                            colname));
     671              : 
     672          537 :         set = bms_add_member(set, attnum);
     673              :     }
     674              : 
     675          983 :     return set;
     676              : }
     677              : 
     678              : /*
     679              :  * Transform a column list (represented by an array Datum) to a bitmapset.
     680              :  *
     681              :  * If columns isn't NULL, add the column numbers to that set.
     682              :  *
     683              :  * If mcxt isn't NULL, build the bitmapset in that context.
     684              :  */
     685              : Bitmapset *
     686          280 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
     687              : {
     688          280 :     Bitmapset  *result = columns;
     689              :     ArrayType  *arr;
     690              :     int         nelems;
     691              :     int16      *elems;
     692          280 :     MemoryContext oldcxt = NULL;
     693              : 
     694          280 :     arr = DatumGetArrayTypeP(pubcols);
     695          280 :     nelems = ARR_DIMS(arr)[0];
     696          280 :     elems = (int16 *) ARR_DATA_PTR(arr);
     697              : 
     698              :     /* If a memory context was specified, switch to it. */
     699          280 :     if (mcxt)
     700           41 :         oldcxt = MemoryContextSwitchTo(mcxt);
     701              : 
     702          770 :     for (int i = 0; i < nelems; i++)
     703          490 :         result = bms_add_member(result, elems[i]);
     704              : 
     705          280 :     if (mcxt)
     706           41 :         MemoryContextSwitchTo(oldcxt);
     707              : 
     708          280 :     return result;
     709              : }
     710              : 
     711              : /*
     712              :  * Returns a bitmap representing the columns of the specified table.
     713              :  *
     714              :  * Generated columns are included if include_gencols_type is
     715              :  * PUBLISH_GENCOLS_STORED.
     716              :  */
     717              : Bitmapset *
     718            9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
     719              : {
     720            9 :     Bitmapset  *result = NULL;
     721            9 :     TupleDesc   desc = RelationGetDescr(relation);
     722              : 
     723           30 :     for (int i = 0; i < desc->natts; i++)
     724              :     {
     725           21 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     726              : 
     727           21 :         if (att->attisdropped)
     728            1 :             continue;
     729              : 
     730           20 :         if (att->attgenerated)
     731              :         {
     732              :             /* We only support replication of STORED generated cols. */
     733            2 :             if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
     734            1 :                 continue;
     735              : 
     736              :             /* User hasn't requested to replicate STORED generated cols. */
     737            1 :             if (include_gencols_type != PUBLISH_GENCOLS_STORED)
     738            1 :                 continue;
     739              :         }
     740              : 
     741           18 :         result = bms_add_member(result, att->attnum);
     742              :     }
     743              : 
     744            9 :     return result;
     745              : }
     746              : 
     747              : /*
     748              :  * Insert new publication / schema mapping.
     749              :  */
     750              : ObjectAddress
     751          171 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
     752              : {
     753              :     Relation    rel;
     754              :     HeapTuple   tup;
     755              :     Datum       values[Natts_pg_publication_namespace];
     756              :     bool        nulls[Natts_pg_publication_namespace];
     757              :     Oid         psschid;
     758          171 :     Publication *pub = GetPublication(pubid);
     759          171 :     List       *schemaRels = NIL;
     760              :     ObjectAddress myself,
     761              :                 referenced;
     762              : 
     763          171 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
     764              : 
     765              :     /*
     766              :      * Check for duplicates. Note that this does not really prevent
     767              :      * duplicates, it's here just to provide nicer error message in common
     768              :      * case. The real protection is the unique key on the catalog.
     769              :      */
     770          171 :     if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     771              :                               ObjectIdGetDatum(schemaid),
     772              :                               ObjectIdGetDatum(pubid)))
     773              :     {
     774           12 :         table_close(rel, RowExclusiveLock);
     775              : 
     776           12 :         if (if_not_exists)
     777            8 :             return InvalidObjectAddress;
     778              : 
     779            4 :         ereport(ERROR,
     780              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     781              :                  errmsg("schema \"%s\" is already member of publication \"%s\"",
     782              :                         get_namespace_name(schemaid), pub->name)));
     783              :     }
     784              : 
     785          159 :     check_publication_add_schema(schemaid);
     786              : 
     787              :     /* Form a tuple */
     788          155 :     memset(values, 0, sizeof(values));
     789          155 :     memset(nulls, false, sizeof(nulls));
     790              : 
     791          155 :     psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
     792              :                                  Anum_pg_publication_namespace_oid);
     793          155 :     values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
     794          155 :     values[Anum_pg_publication_namespace_pnpubid - 1] =
     795          155 :         ObjectIdGetDatum(pubid);
     796          155 :     values[Anum_pg_publication_namespace_pnnspid - 1] =
     797          155 :         ObjectIdGetDatum(schemaid);
     798              : 
     799          155 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     800              : 
     801              :     /* Insert tuple into catalog */
     802          155 :     CatalogTupleInsert(rel, tup);
     803          155 :     heap_freetuple(tup);
     804              : 
     805          155 :     ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
     806              : 
     807              :     /* Add dependency on the publication */
     808          155 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     809          155 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     810              : 
     811              :     /* Add dependency on the schema */
     812          155 :     ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
     813          155 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     814              : 
     815              :     /* Close the table */
     816          155 :     table_close(rel, RowExclusiveLock);
     817              : 
     818              :     /*
     819              :      * Invalidate relcache so that publication info is rebuilt. See
     820              :      * publication_add_relation for why we need to consider all the
     821              :      * partitions.
     822              :      */
     823          155 :     schemaRels = GetSchemaPublicationRelations(schemaid,
     824              :                                                PUBLICATION_PART_ALL);
     825          155 :     InvalidatePublicationRels(schemaRels);
     826              : 
     827          155 :     return myself;
     828              : }
     829              : 
     830              : /*
     831              :  * Internal function to get the list of publication oids for a relation.
     832              :  *
     833              :  * If except_flag is true, returns the list of publication that specified the
     834              :  * relation in EXCEPT clause; otherwise, returns the list of publications
     835              :  * in which relation is included.
     836              :  */
     837              : static List *
     838        15539 : get_relation_publications(Oid relid, bool except_flag)
     839              : {
     840        15539 :     List       *result = NIL;
     841              :     CatCList   *pubrellist;
     842              : 
     843              :     /* Find all publications associated with the relation. */
     844        15539 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     845              :                                      ObjectIdGetDatum(relid));
     846        16993 :     for (int i = 0; i < pubrellist->n_members; i++)
     847              :     {
     848         1454 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     849         1454 :         Form_pg_publication_rel pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     850         1454 :         Oid         pubid = pubrel->prpubid;
     851              : 
     852         1454 :         if (pubrel->prexcept == except_flag)
     853         1020 :             result = lappend_oid(result, pubid);
     854              :     }
     855              : 
     856        15539 :     ReleaseSysCacheList(pubrellist);
     857              : 
     858        15539 :     return result;
     859              : }
     860              : 
     861              : /*
     862              :  * Gets list of publication oids for a relation.
     863              :  */
     864              : List *
     865         8335 : GetRelationIncludedPublications(Oid relid)
     866              : {
     867         8335 :     return get_relation_publications(relid, false);
     868              : }
     869              : 
     870              : /*
     871              :  * Gets list of publication oids which has relation in EXCEPT clause.
     872              :  */
     873              : List *
     874         7204 : GetRelationExcludedPublications(Oid relid)
     875              : {
     876         7204 :     return get_relation_publications(relid, true);
     877              : }
     878              : 
     879              : /*
     880              :  * Internal function to get the list of relation oids for a publication.
     881              :  *
     882              :  * If except_flag is true, returns the list of relations specified in the
     883              :  * EXCEPT clause of the publication; otherwise, returns the list of relations
     884              :  * included in the publication.
     885              :  */
     886              : static List *
     887         1539 : get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt,
     888              :                           bool except_flag)
     889              : {
     890              :     List       *result;
     891              :     Relation    pubrelsrel;
     892              :     ScanKeyData scankey;
     893              :     SysScanDesc scan;
     894              :     HeapTuple   tup;
     895              : 
     896              :     /* Find all relations associated with the publication. */
     897         1539 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     898              : 
     899         1539 :     ScanKeyInit(&scankey,
     900              :                 Anum_pg_publication_rel_prpubid,
     901              :                 BTEqualStrategyNumber, F_OIDEQ,
     902              :                 ObjectIdGetDatum(pubid));
     903              : 
     904         1539 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
     905              :                               true, NULL, 1, &scankey);
     906              : 
     907         1539 :     result = NIL;
     908         4793 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     909              :     {
     910              :         Form_pg_publication_rel pubrel;
     911              : 
     912         1715 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     913              : 
     914         1715 :         if (except_flag == pubrel->prexcept)
     915         1715 :             result = GetPubPartitionOptionRelations(result, pub_partopt,
     916              :                                                     pubrel->prrelid);
     917              :     }
     918              : 
     919         1539 :     systable_endscan(scan);
     920         1539 :     table_close(pubrelsrel, AccessShareLock);
     921              : 
     922              :     /* Now sort and de-duplicate the result list */
     923         1539 :     list_sort(result, list_oid_cmp);
     924         1539 :     list_deduplicate_oid(result);
     925              : 
     926         1539 :     return result;
     927              : }
     928              : 
     929              : /*
     930              :  * Gets list of relation oids that are associated with a publication.
     931              :  *
     932              :  * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
     933              :  * should use GetAllPublicationRelations().
     934              :  */
     935              : List *
     936         1319 : GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     937              : {
     938              :     Assert(!GetPublication(pubid)->alltables);
     939              : 
     940         1319 :     return get_publication_relations(pubid, pub_partopt, false);
     941              : }
     942              : 
     943              : /*
     944              :  * Gets list of table oids that were specified in the EXCEPT clause for a
     945              :  * publication.
     946              :  *
     947              :  * This should only be used FOR ALL TABLES publications.
     948              :  */
     949              : List *
     950          220 : GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
     951              : {
     952              :     Assert(GetPublication(pubid)->alltables);
     953              : 
     954          220 :     return get_publication_relations(pubid, pub_partopt, true);
     955              : }
     956              : 
     957              : /*
     958              :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     959              :  */
     960              : List *
     961         5518 : GetAllTablesPublications(void)
     962              : {
     963              :     List       *result;
     964              :     Relation    rel;
     965              :     ScanKeyData scankey;
     966              :     SysScanDesc scan;
     967              :     HeapTuple   tup;
     968              : 
     969              :     /* Find all publications that are marked as for all tables. */
     970         5518 :     rel = table_open(PublicationRelationId, AccessShareLock);
     971              : 
     972         5518 :     ScanKeyInit(&scankey,
     973              :                 Anum_pg_publication_puballtables,
     974              :                 BTEqualStrategyNumber, F_BOOLEQ,
     975              :                 BoolGetDatum(true));
     976              : 
     977         5518 :     scan = systable_beginscan(rel, InvalidOid, false,
     978              :                               NULL, 1, &scankey);
     979              : 
     980         5518 :     result = NIL;
     981         5631 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     982              :     {
     983          113 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
     984              : 
     985          113 :         result = lappend_oid(result, oid);
     986              :     }
     987              : 
     988         5518 :     systable_endscan(scan);
     989         5518 :     table_close(rel, AccessShareLock);
     990              : 
     991         5518 :     return result;
     992              : }
     993              : 
     994              : /*
     995              :  * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
     996              :  * publication.
     997              :  *
     998              :  * If the publication publishes partition changes via their respective root
     999              :  * partitioned tables, we must exclude partitions in favor of including the
    1000              :  * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
    1001              :  * publication.
    1002              :  *
    1003              :  * For a FOR ALL TABLES publication, the returned list excludes tables mentioned
    1004              :  * in EXCEPT TABLE clause.
    1005              :  */
    1006              : List *
    1007          209 : GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
    1008              : {
    1009              :     Relation    classRel;
    1010              :     ScanKeyData key[1];
    1011              :     TableScanDesc scan;
    1012              :     HeapTuple   tuple;
    1013          209 :     List       *result = NIL;
    1014          209 :     List       *exceptlist = NIL;
    1015              : 
    1016              :     Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
    1017              : 
    1018              :     /* EXCEPT filtering applies only to relations, not sequences */
    1019          209 :     if (relkind == RELKIND_RELATION)
    1020          203 :         exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ?
    1021          203 :                                                   PUBLICATION_PART_ROOT :
    1022              :                                                   PUBLICATION_PART_LEAF);
    1023              : 
    1024          209 :     classRel = table_open(RelationRelationId, AccessShareLock);
    1025              : 
    1026          209 :     ScanKeyInit(&key[0],
    1027              :                 Anum_pg_class_relkind,
    1028              :                 BTEqualStrategyNumber, F_CHAREQ,
    1029              :                 CharGetDatum(relkind));
    1030              : 
    1031          209 :     scan = table_beginscan_catalog(classRel, 1, key);
    1032              : 
    1033        15963 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    1034              :     {
    1035        15754 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
    1036        15754 :         Oid         relid = relForm->oid;
    1037              : 
    1038        15754 :         if (is_publishable_class(relid, relForm) &&
    1039          929 :             !(relForm->relispartition && pubviaroot) &&
    1040          832 :             !list_member_oid(exceptlist, relid))
    1041          798 :             result = lappend_oid(result, relid);
    1042              :     }
    1043              : 
    1044          209 :     table_endscan(scan);
    1045              : 
    1046          209 :     if (pubviaroot)
    1047              :     {
    1048           16 :         ScanKeyInit(&key[0],
    1049              :                     Anum_pg_class_relkind,
    1050              :                     BTEqualStrategyNumber, F_CHAREQ,
    1051              :                     CharGetDatum(RELKIND_PARTITIONED_TABLE));
    1052              : 
    1053           16 :         scan = table_beginscan_catalog(classRel, 1, key);
    1054              : 
    1055           87 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    1056              :         {
    1057           71 :             Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
    1058           71 :             Oid         relid = relForm->oid;
    1059              : 
    1060           71 :             if (is_publishable_class(relid, relForm) &&
    1061           71 :                 !relForm->relispartition &&
    1062           55 :                 !list_member_oid(exceptlist, relid))
    1063           52 :                 result = lappend_oid(result, relid);
    1064              :         }
    1065              : 
    1066           16 :         table_endscan(scan);
    1067              :     }
    1068              : 
    1069          209 :     table_close(classRel, AccessShareLock);
    1070          209 :     return result;
    1071              : }
    1072              : 
    1073              : /*
    1074              :  * Gets the list of schema oids for a publication.
    1075              :  *
    1076              :  * This should only be used FOR TABLES IN SCHEMA publications.
    1077              :  */
    1078              : List *
    1079         1281 : GetPublicationSchemas(Oid pubid)
    1080              : {
    1081         1281 :     List       *result = NIL;
    1082              :     Relation    pubschsrel;
    1083              :     ScanKeyData scankey;
    1084              :     SysScanDesc scan;
    1085              :     HeapTuple   tup;
    1086              : 
    1087              :     /* Find all schemas associated with the publication */
    1088         1281 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
    1089              : 
    1090         1281 :     ScanKeyInit(&scankey,
    1091              :                 Anum_pg_publication_namespace_pnpubid,
    1092              :                 BTEqualStrategyNumber, F_OIDEQ,
    1093              :                 ObjectIdGetDatum(pubid));
    1094              : 
    1095         1281 :     scan = systable_beginscan(pubschsrel,
    1096              :                               PublicationNamespacePnnspidPnpubidIndexId,
    1097              :                               true, NULL, 1, &scankey);
    1098         1339 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
    1099              :     {
    1100              :         Form_pg_publication_namespace pubsch;
    1101              : 
    1102           58 :         pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1103              : 
    1104           58 :         result = lappend_oid(result, pubsch->pnnspid);
    1105              :     }
    1106              : 
    1107         1281 :     systable_endscan(scan);
    1108         1281 :     table_close(pubschsrel, AccessShareLock);
    1109              : 
    1110         1281 :     return result;
    1111              : }
    1112              : 
    1113              : /*
    1114              :  * Gets the list of publication oids associated with a specified schema.
    1115              :  */
    1116              : List *
    1117         8039 : GetSchemaPublications(Oid schemaid)
    1118              : {
    1119         8039 :     List       *result = NIL;
    1120              :     CatCList   *pubschlist;
    1121              :     int         i;
    1122              : 
    1123              :     /* Find all publications associated with the schema */
    1124         8039 :     pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
    1125              :                                      ObjectIdGetDatum(schemaid));
    1126         8103 :     for (i = 0; i < pubschlist->n_members; i++)
    1127              :     {
    1128           64 :         HeapTuple   tup = &pubschlist->members[i]->tuple;
    1129           64 :         Oid         pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
    1130              : 
    1131           64 :         result = lappend_oid(result, pubid);
    1132              :     }
    1133              : 
    1134         8039 :     ReleaseSysCacheList(pubschlist);
    1135              : 
    1136         8039 :     return result;
    1137              : }
    1138              : 
    1139              : /*
    1140              :  * Get the list of publishable relation oids for a specified schema.
    1141              :  */
    1142              : List *
    1143          320 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
    1144              : {
    1145              :     Relation    classRel;
    1146              :     ScanKeyData key[1];
    1147              :     TableScanDesc scan;
    1148              :     HeapTuple   tuple;
    1149          320 :     List       *result = NIL;
    1150              : 
    1151              :     Assert(OidIsValid(schemaid));
    1152              : 
    1153          320 :     classRel = table_open(RelationRelationId, AccessShareLock);
    1154              : 
    1155          320 :     ScanKeyInit(&key[0],
    1156              :                 Anum_pg_class_relnamespace,
    1157              :                 BTEqualStrategyNumber, F_OIDEQ,
    1158              :                 ObjectIdGetDatum(schemaid));
    1159              : 
    1160              :     /* get all the relations present in the specified schema */
    1161          320 :     scan = table_beginscan_catalog(classRel, 1, key);
    1162        17194 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    1163              :     {
    1164        16874 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
    1165        16874 :         Oid         relid = relForm->oid;
    1166              :         char        relkind;
    1167              : 
    1168        16874 :         if (!is_publishable_class(relid, relForm))
    1169         5795 :             continue;
    1170              : 
    1171        11079 :         relkind = get_rel_relkind(relid);
    1172        11079 :         if (relkind == RELKIND_RELATION)
    1173         9510 :             result = lappend_oid(result, relid);
    1174         1569 :         else if (relkind == RELKIND_PARTITIONED_TABLE)
    1175              :         {
    1176          511 :             List       *partitionrels = NIL;
    1177              : 
    1178              :             /*
    1179              :              * It is quite possible that some of the partitions are in a
    1180              :              * different schema than the parent table, so we need to get such
    1181              :              * partitions separately.
    1182              :              */
    1183          511 :             partitionrels = GetPubPartitionOptionRelations(partitionrels,
    1184              :                                                            pub_partopt,
    1185              :                                                            relForm->oid);
    1186          511 :             result = list_concat_unique_oid(result, partitionrels);
    1187              :         }
    1188              :     }
    1189              : 
    1190          320 :     table_endscan(scan);
    1191          320 :     table_close(classRel, AccessShareLock);
    1192          320 :     return result;
    1193              : }
    1194              : 
    1195              : /*
    1196              :  * Gets the list of all relations published by FOR TABLES IN SCHEMA
    1197              :  * publication.
    1198              :  */
    1199              : List *
    1200          992 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
    1201              : {
    1202          992 :     List       *result = NIL;
    1203          992 :     List       *pubschemalist = GetPublicationSchemas(pubid);
    1204              :     ListCell   *cell;
    1205              : 
    1206         1030 :     foreach(cell, pubschemalist)
    1207              :     {
    1208           38 :         Oid         schemaid = lfirst_oid(cell);
    1209           38 :         List       *schemaRels = NIL;
    1210              : 
    1211           38 :         schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
    1212           38 :         result = list_concat(result, schemaRels);
    1213              :     }
    1214              : 
    1215          992 :     return result;
    1216              : }
    1217              : 
    1218              : /*
    1219              :  * Get publication using oid
    1220              :  *
    1221              :  * The Publication struct and its data are palloc'ed here.
    1222              :  */
    1223              : Publication *
    1224         5330 : GetPublication(Oid pubid)
    1225              : {
    1226              :     HeapTuple   tup;
    1227              :     Publication *pub;
    1228              :     Form_pg_publication pubform;
    1229              : 
    1230         5330 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1231         5330 :     if (!HeapTupleIsValid(tup))
    1232            0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1233              : 
    1234         5330 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1235              : 
    1236         5330 :     pub = palloc_object(Publication);
    1237         5330 :     pub->oid = pubid;
    1238         5330 :     pub->name = pstrdup(NameStr(pubform->pubname));
    1239         5330 :     pub->alltables = pubform->puballtables;
    1240         5330 :     pub->allsequences = pubform->puballsequences;
    1241         5330 :     pub->pubactions.pubinsert = pubform->pubinsert;
    1242         5330 :     pub->pubactions.pubupdate = pubform->pubupdate;
    1243         5330 :     pub->pubactions.pubdelete = pubform->pubdelete;
    1244         5330 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
    1245         5330 :     pub->pubviaroot = pubform->pubviaroot;
    1246         5330 :     pub->pubgencols_type = pubform->pubgencols;
    1247              : 
    1248         5330 :     ReleaseSysCache(tup);
    1249              : 
    1250         5330 :     return pub;
    1251              : }
    1252              : 
    1253              : /*
    1254              :  * Get Publication using name.
    1255              :  */
    1256              : Publication *
    1257         1661 : GetPublicationByName(const char *pubname, bool missing_ok)
    1258              : {
    1259              :     Oid         oid;
    1260              : 
    1261         1661 :     oid = get_publication_oid(pubname, missing_ok);
    1262              : 
    1263         1660 :     return OidIsValid(oid) ? GetPublication(oid) : NULL;
    1264              : }
    1265              : 
    1266              : /*
    1267              :  * Get information of the tables in the given publication array.
    1268              :  *
    1269              :  * Returns pubid, relid, column list, row filter for each table.
    1270              :  */
    1271              : Datum
    1272         3251 : pg_get_publication_tables(PG_FUNCTION_ARGS)
    1273              : {
    1274              : #define NUM_PUBLICATION_TABLES_ELEM 4
    1275              :     FuncCallContext *funcctx;
    1276         3251 :     List       *table_infos = NIL;
    1277              : 
    1278              :     /* stuff done only on the first call of the function */
    1279         3251 :     if (SRF_IS_FIRSTCALL())
    1280              :     {
    1281              :         TupleDesc   tupdesc;
    1282              :         MemoryContext oldcontext;
    1283              :         ArrayType  *arr;
    1284              :         Datum      *elems;
    1285              :         int         nelems,
    1286              :                     i;
    1287         1068 :         bool        viaroot = false;
    1288              : 
    1289              :         /* create a function context for cross-call persistence */
    1290         1068 :         funcctx = SRF_FIRSTCALL_INIT();
    1291              : 
    1292              :         /* switch to memory context appropriate for multiple function calls */
    1293         1068 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1294              : 
    1295              :         /*
    1296              :          * Deconstruct the parameter into elements where each element is a
    1297              :          * publication name.
    1298              :          */
    1299         1068 :         arr = PG_GETARG_ARRAYTYPE_P(0);
    1300         1068 :         deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
    1301              : 
    1302              :         /* Get Oids of tables from each publication. */
    1303         2183 :         for (i = 0; i < nelems; i++)
    1304              :         {
    1305              :             Publication *pub_elem;
    1306         1116 :             List       *pub_elem_tables = NIL;
    1307              :             ListCell   *lc;
    1308              : 
    1309         1116 :             pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
    1310              : 
    1311              :             /*
    1312              :              * Publications support partitioned tables. If
    1313              :              * publish_via_partition_root is false, all changes are replicated
    1314              :              * using leaf partition identity and schema, so we only need
    1315              :              * those. Otherwise, get the partitioned table itself.
    1316              :              */
    1317         1115 :             if (pub_elem->alltables)
    1318          203 :                 pub_elem_tables = GetAllPublicationRelations(pub_elem->oid,
    1319              :                                                              RELKIND_RELATION,
    1320          203 :                                                              pub_elem->pubviaroot);
    1321              :             else
    1322              :             {
    1323              :                 List       *relids,
    1324              :                            *schemarelids;
    1325              : 
    1326          912 :                 relids = GetIncludedPublicationRelations(pub_elem->oid,
    1327          912 :                                                          pub_elem->pubviaroot ?
    1328          912 :                                                          PUBLICATION_PART_ROOT :
    1329              :                                                          PUBLICATION_PART_LEAF);
    1330          912 :                 schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
    1331          912 :                                                                 pub_elem->pubviaroot ?
    1332          912 :                                                                 PUBLICATION_PART_ROOT :
    1333              :                                                                 PUBLICATION_PART_LEAF);
    1334          912 :                 pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
    1335              :             }
    1336              : 
    1337              :             /*
    1338              :              * Record the published table and the corresponding publication so
    1339              :              * that we can get row filters and column lists later.
    1340              :              *
    1341              :              * When a table is published by multiple publications, to obtain
    1342              :              * all row filters and column lists, the structure related to this
    1343              :              * table will be recorded multiple times.
    1344              :              */
    1345         3342 :             foreach(lc, pub_elem_tables)
    1346              :             {
    1347         2227 :                 published_rel *table_info = palloc_object(published_rel);
    1348              : 
    1349         2227 :                 table_info->relid = lfirst_oid(lc);
    1350         2227 :                 table_info->pubid = pub_elem->oid;
    1351         2227 :                 table_infos = lappend(table_infos, table_info);
    1352              :             }
    1353              : 
    1354              :             /* At least one publication is using publish_via_partition_root. */
    1355         1115 :             if (pub_elem->pubviaroot)
    1356          193 :                 viaroot = true;
    1357              :         }
    1358              : 
    1359              :         /*
    1360              :          * If the publication publishes partition changes via their respective
    1361              :          * root partitioned tables, we must exclude partitions in favor of
    1362              :          * including the root partitioned tables. Otherwise, the function
    1363              :          * could return both the child and parent tables which could cause
    1364              :          * data of the child table to be double-published on the subscriber
    1365              :          * side.
    1366              :          */
    1367         1067 :         if (viaroot)
    1368          185 :             filter_partitions(table_infos);
    1369              : 
    1370              :         /* Construct a tuple descriptor for the result rows. */
    1371         1067 :         tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
    1372         1067 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
    1373              :                            OIDOID, -1, 0);
    1374         1067 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
    1375              :                            OIDOID, -1, 0);
    1376         1067 :         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
    1377              :                            INT2VECTOROID, -1, 0);
    1378         1067 :         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
    1379              :                            PG_NODE_TREEOID, -1, 0);
    1380              : 
    1381         1067 :         TupleDescFinalize(tupdesc);
    1382         1067 :         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
    1383         1067 :         funcctx->user_fctx = table_infos;
    1384              : 
    1385         1067 :         MemoryContextSwitchTo(oldcontext);
    1386              :     }
    1387              : 
    1388              :     /* stuff done on every call of the function */
    1389         3250 :     funcctx = SRF_PERCALL_SETUP();
    1390         3250 :     table_infos = (List *) funcctx->user_fctx;
    1391              : 
    1392         3250 :     if (funcctx->call_cntr < list_length(table_infos))
    1393              :     {
    1394         2183 :         HeapTuple   pubtuple = NULL;
    1395              :         HeapTuple   rettuple;
    1396              :         Publication *pub;
    1397         2183 :         published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
    1398         2183 :         Oid         relid = table_info->relid;
    1399         2183 :         Oid         schemaid = get_rel_namespace(relid);
    1400         2183 :         Datum       values[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1401         2183 :         bool        nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1402              : 
    1403              :         /*
    1404              :          * Form tuple with appropriate data.
    1405              :          */
    1406              : 
    1407         2183 :         pub = GetPublication(table_info->pubid);
    1408              : 
    1409         2183 :         values[0] = ObjectIdGetDatum(pub->oid);
    1410         2183 :         values[1] = ObjectIdGetDatum(relid);
    1411              : 
    1412              :         /*
    1413              :          * We don't consider row filters or column lists for FOR ALL TABLES or
    1414              :          * FOR TABLES IN SCHEMA publications.
    1415              :          */
    1416         2183 :         if (!pub->alltables &&
    1417         1350 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
    1418              :                                    ObjectIdGetDatum(schemaid),
    1419              :                                    ObjectIdGetDatum(pub->oid)))
    1420         1300 :             pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
    1421              :                                            ObjectIdGetDatum(relid),
    1422              :                                            ObjectIdGetDatum(pub->oid));
    1423              : 
    1424         2183 :         if (HeapTupleIsValid(pubtuple))
    1425              :         {
    1426              :             /* Lookup the column list attribute. */
    1427         1187 :             values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1428              :                                         Anum_pg_publication_rel_prattrs,
    1429              :                                         &(nulls[2]));
    1430              : 
    1431              :             /* Null indicates no filter. */
    1432         1187 :             values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1433              :                                         Anum_pg_publication_rel_prqual,
    1434              :                                         &(nulls[3]));
    1435              :         }
    1436              :         else
    1437              :         {
    1438          996 :             nulls[2] = true;
    1439          996 :             nulls[3] = true;
    1440              :         }
    1441              : 
    1442              :         /* Show all columns when the column list is not specified. */
    1443         2183 :         if (nulls[2])
    1444              :         {
    1445         2057 :             Relation    rel = table_open(relid, AccessShareLock);
    1446         2057 :             int         nattnums = 0;
    1447              :             int16      *attnums;
    1448         2057 :             TupleDesc   desc = RelationGetDescr(rel);
    1449              :             int         i;
    1450              : 
    1451         2057 :             attnums = palloc_array(int16, desc->natts);
    1452              : 
    1453         5596 :             for (i = 0; i < desc->natts; i++)
    1454              :             {
    1455         3539 :                 Form_pg_attribute att = TupleDescAttr(desc, i);
    1456              : 
    1457         3539 :                 if (att->attisdropped)
    1458            6 :                     continue;
    1459              : 
    1460         3533 :                 if (att->attgenerated)
    1461              :                 {
    1462              :                     /* We only support replication of STORED generated cols. */
    1463           48 :                     if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
    1464           36 :                         continue;
    1465              : 
    1466              :                     /*
    1467              :                      * User hasn't requested to replicate STORED generated
    1468              :                      * cols.
    1469              :                      */
    1470           12 :                     if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
    1471            9 :                         continue;
    1472              :                 }
    1473              : 
    1474         3488 :                 attnums[nattnums++] = att->attnum;
    1475              :             }
    1476              : 
    1477         2057 :             if (nattnums > 0)
    1478              :             {
    1479         2035 :                 values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
    1480         2035 :                 nulls[2] = false;
    1481              :             }
    1482              : 
    1483         2057 :             table_close(rel, AccessShareLock);
    1484              :         }
    1485              : 
    1486         2183 :         rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
    1487              : 
    1488         2183 :         SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
    1489              :     }
    1490              : 
    1491         1067 :     SRF_RETURN_DONE(funcctx);
    1492              : }
    1493              : 
    1494              : /*
    1495              :  * Returns Oids of sequences in a publication.
    1496              :  */
    1497              : Datum
    1498          233 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
    1499              : {
    1500              :     FuncCallContext *funcctx;
    1501          233 :     List       *sequences = NIL;
    1502              : 
    1503              :     /* stuff done only on the first call of the function */
    1504          233 :     if (SRF_IS_FIRSTCALL())
    1505              :     {
    1506          216 :         char       *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1507              :         Publication *publication;
    1508              :         MemoryContext oldcontext;
    1509              : 
    1510              :         /* create a function context for cross-call persistence */
    1511          216 :         funcctx = SRF_FIRSTCALL_INIT();
    1512              : 
    1513              :         /* switch to memory context appropriate for multiple function calls */
    1514          216 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1515              : 
    1516          216 :         publication = GetPublicationByName(pubname, false);
    1517              : 
    1518          216 :         if (publication->allsequences)
    1519            6 :             sequences = GetAllPublicationRelations(publication->oid,
    1520              :                                                    RELKIND_SEQUENCE,
    1521              :                                                    false);
    1522              : 
    1523          216 :         funcctx->user_fctx = sequences;
    1524              : 
    1525          216 :         MemoryContextSwitchTo(oldcontext);
    1526              :     }
    1527              : 
    1528              :     /* stuff done on every call of the function */
    1529          233 :     funcctx = SRF_PERCALL_SETUP();
    1530          233 :     sequences = (List *) funcctx->user_fctx;
    1531              : 
    1532          233 :     if (funcctx->call_cntr < list_length(sequences))
    1533              :     {
    1534           17 :         Oid         relid = list_nth_oid(sequences, funcctx->call_cntr);
    1535              : 
    1536           17 :         SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
    1537              :     }
    1538              : 
    1539          216 :     SRF_RETURN_DONE(funcctx);
    1540              : }
        

Generated by: LCOV version 2.0-1