LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 99.4 % 541 538
Test Date: 2026-05-25 04:16:11 Functions: 100.0 % 38 38
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_publication.c
       4              :  *      publication C API manipulation
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/catalog/pg_publication.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/genam.h"
      18              : #include "access/heapam.h"
      19              : #include "access/htup_details.h"
      20              : #include "access/tableam.h"
      21              : #include "catalog/catalog.h"
      22              : #include "catalog/dependency.h"
      23              : #include "catalog/indexing.h"
      24              : #include "catalog/namespace.h"
      25              : #include "catalog/objectaddress.h"
      26              : #include "catalog/partition.h"
      27              : #include "catalog/pg_inherits.h"
      28              : #include "catalog/pg_namespace.h"
      29              : #include "catalog/pg_publication.h"
      30              : #include "catalog/pg_publication_namespace.h"
      31              : #include "catalog/pg_publication_rel.h"
      32              : #include "catalog/pg_type.h"
      33              : #include "commands/publicationcmds.h"
      34              : #include "funcapi.h"
      35              : #include "utils/array.h"
      36              : #include "utils/builtins.h"
      37              : #include "utils/catcache.h"
      38              : #include "utils/fmgroids.h"
      39              : #include "utils/lsyscache.h"
      40              : #include "utils/rel.h"
      41              : #include "utils/syscache.h"
      42              : 
      43              : /* Records association between publication and published table */
      44              : typedef struct
      45              : {
      46              :     Oid         relid;          /* OID of published table */
      47              :     Oid         pubid;          /* OID of publication that publishes this
      48              :                                  * table. */
      49              : } published_rel;
      50              : 
      51              : /*
      52              :  * Check if relation can be in given publication and throws appropriate
      53              :  * error if not.
      54              :  */
      55              : static void
      56          808 : check_publication_add_relation(PublicationRelInfo *pri)
      57              : {
      58          808 :     Relation    targetrel = pri->relation;
      59              :     const char *relname;
      60              :     const char *errormsg;
      61              : 
      62          808 :     if (pri->except)
      63              :     {
      64           81 :         relname = RelationGetQualifiedRelationName(targetrel);
      65           81 :         errormsg = gettext_noop("cannot specify relation \"%s\" in the publication EXCEPT clause");
      66              :     }
      67              :     else
      68              :     {
      69          727 :         relname = RelationGetRelationName(targetrel);
      70          727 :         errormsg = gettext_noop("cannot add relation \"%s\" to publication");
      71              :     }
      72              : 
      73              :     /* If in EXCEPT clause, must be root partitioned table */
      74          808 :     if (pri->except && targetrel->rd_rel->relispartition)
      75            4 :         ereport(ERROR,
      76              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      77              :                  errmsg(errormsg, relname),
      78              :                  errdetail("This operation is not supported for individual partitions.")));
      79              : 
      80              :     /* Must be a regular or partitioned table */
      81          804 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
      82          115 :         RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
      83            9 :         ereport(ERROR,
      84              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      85              :                  errmsg(errormsg, relname),
      86              :                  errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
      87              : 
      88              :     /* Can't be system table */
      89          795 :     if (IsCatalogRelation(targetrel))
      90            4 :         ereport(ERROR,
      91              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      92              :                  errmsg(errormsg, relname),
      93              :                  errdetail("This operation is not supported for system tables.")));
      94              : 
      95              :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      96          791 :     if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
      97            4 :         ereport(ERROR,
      98              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      99              :                  errmsg(errormsg, relname),
     100              :                  errdetail("This operation is not supported for temporary tables.")));
     101          787 :     else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
     102            4 :         ereport(ERROR,
     103              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     104              :                  errmsg(errormsg, relname),
     105              :                  errdetail("This operation is not supported for unlogged tables.")));
     106          783 : }
     107              : 
     108              : /*
     109              :  * Check if schema can be in given publication and throw appropriate error if
     110              :  * not.
     111              :  */
     112              : static void
     113          163 : check_publication_add_schema(Oid schemaid)
     114              : {
     115              :     /* Can't be system namespace */
     116          163 :     if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
     117            4 :         ereport(ERROR,
     118              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     119              :                  errmsg("cannot add schema \"%s\" to publication",
     120              :                         get_namespace_name(schemaid)),
     121              :                  errdetail("This operation is not supported for system schemas.")));
     122              : 
     123              :     /* Can't be temporary namespace */
     124          159 :     if (isAnyTempNamespace(schemaid))
     125            0 :         ereport(ERROR,
     126              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     127              :                  errmsg("cannot add schema \"%s\" to publication",
     128              :                         get_namespace_name(schemaid)),
     129              :                  errdetail("Temporary schemas cannot be replicated.")));
     130          159 : }
     131              : 
     132              : /*
     133              :  * Returns if relation represented by oid and Form_pg_class entry
     134              :  * is publishable.
     135              :  *
     136              :  * Does same checks as check_publication_add_relation() above except for
     137              :  * RELKIND_SEQUENCE, but does not need relation to be opened and also does
     138              :  * not throw errors. Here, the additional check is to support ALL SEQUENCES
     139              :  * publication.
     140              :  *
     141              :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
     142              :  * ie all tables created during initdb.  This mainly affects the preinstalled
     143              :  * information_schema.  IsCatalogRelationOid() only excludes tables with
     144              :  * relid < FirstUnpinnedObjectId, making that test rather redundant,
     145              :  * but really we should get rid of the FirstNormalObjectId test not
     146              :  * IsCatalogRelationOid.  We can't do so today because we don't want
     147              :  * information_schema tables to be considered publishable; but this test
     148              :  * is really inadequate for that, since the information_schema could be
     149              :  * dropped and reloaded and then it'll be considered publishable.  The best
     150              :  * long-term solution may be to add a "relispublishable" bool to pg_class,
     151              :  * and depend on that instead of OID checks.
     152              :  */
     153              : static bool
     154       323172 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     155              : {
     156       331444 :     return (reltuple->relkind == RELKIND_RELATION ||
     157         8272 :             reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
     158         7264 :             reltuple->relkind == RELKIND_SEQUENCE) &&
     159       317279 :         !IsCatalogRelationOid(relid) &&
     160       646344 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     161              :         relid >= FirstNormalObjectId;
     162              : }
     163              : 
     164              : /*
     165              :  * Another variant of is_publishable_class(), taking a Relation.
     166              :  */
     167              : bool
     168       297657 : is_publishable_relation(Relation rel)
     169              : {
     170       297657 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     171              : }
     172              : 
     173              : /*
     174              :  * Similar to is_publishable_class() but checks whether the given OID
     175              :  * is a publishable "table" or not.
     176              :  */
     177              : static bool
     178          554 : is_publishable_table(Oid tableoid)
     179              : {
     180              :     HeapTuple   tuple;
     181              :     Form_pg_class relform;
     182              : 
     183          554 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(tableoid));
     184          554 :     if (!HeapTupleIsValid(tuple))
     185            4 :         return false;
     186              : 
     187          550 :     relform = (Form_pg_class) GETSTRUCT(tuple);
     188              : 
     189              :     /*
     190              :      * is_publishable_class() includes sequences, so we need to explicitly
     191              :      * check the relkind to filter them out here.
     192              :      */
     193         1100 :     if (relform->relkind != RELKIND_SEQUENCE &&
     194          550 :         is_publishable_class(tableoid, relform))
     195              :     {
     196          546 :         ReleaseSysCache(tuple);
     197          546 :         return true;
     198              :     }
     199              : 
     200            4 :     ReleaseSysCache(tuple);
     201            4 :     return false;
     202              : }
     203              : 
     204              : /*
     205              :  * SQL-callable variant of the above
     206              :  *
     207              :  * This returns null when the relation does not exist.  This is intended to be
     208              :  * used for example in psql to avoid gratuitous errors when there are
     209              :  * concurrent catalog changes.
     210              :  */
     211              : Datum
     212         4288 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     213              : {
     214         4288 :     Oid         relid = PG_GETARG_OID(0);
     215              :     HeapTuple   tuple;
     216              :     bool        result;
     217              : 
     218         4288 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     219         4288 :     if (!HeapTupleIsValid(tuple))
     220            0 :         PG_RETURN_NULL();
     221         4288 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     222         4288 :     ReleaseSysCache(tuple);
     223         4288 :     PG_RETURN_BOOL(result);
     224              : }
     225              : 
     226              : /*
     227              :  * Returns true if the ancestor is in the list of published relations.
     228              :  * Otherwise, returns false.
     229              :  */
     230              : static bool
     231           61 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
     232              : {
     233              :     ListCell   *lc;
     234              : 
     235          278 :     foreach(lc, table_infos)
     236              :     {
     237          247 :         Oid         relid = ((published_rel *) lfirst(lc))->relid;
     238              : 
     239          247 :         if (relid == ancestor)
     240           30 :             return true;
     241              :     }
     242              : 
     243           31 :     return false;
     244              : }
     245              : 
     246              : /*
     247              :  * Filter out the partitions whose parent tables are also present in the list.
     248              :  */
     249              : static void
     250          169 : filter_partitions(List *table_infos)
     251              : {
     252              :     ListCell   *lc;
     253              : 
     254          399 :     foreach(lc, table_infos)
     255              :     {
     256          230 :         bool        skip = false;
     257          230 :         List       *ancestors = NIL;
     258              :         ListCell   *lc2;
     259          230 :         published_rel *table_info = (published_rel *) lfirst(lc);
     260              : 
     261          230 :         if (get_rel_relispartition(table_info->relid))
     262           61 :             ancestors = get_partition_ancestors(table_info->relid);
     263              : 
     264          261 :         foreach(lc2, ancestors)
     265              :         {
     266           61 :             Oid         ancestor = lfirst_oid(lc2);
     267              : 
     268           61 :             if (is_ancestor_member_tableinfos(ancestor, table_infos))
     269              :             {
     270           30 :                 skip = true;
     271           30 :                 break;
     272              :             }
     273              :         }
     274              : 
     275          230 :         if (skip)
     276           30 :             table_infos = foreach_delete_current(table_infos, lc);
     277              :     }
     278          169 : }
     279              : 
     280              : /*
     281              :  * Returns true if any schema is associated with the publication, false if no
     282              :  * schema is associated with the publication.
     283              :  */
     284              : bool
     285          215 : is_schema_publication(Oid pubid)
     286              : {
     287              :     Relation    pubschsrel;
     288              :     ScanKeyData scankey;
     289              :     SysScanDesc scan;
     290              :     HeapTuple   tup;
     291          215 :     bool        result = false;
     292              : 
     293          215 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     294          215 :     ScanKeyInit(&scankey,
     295              :                 Anum_pg_publication_namespace_pnpubid,
     296              :                 BTEqualStrategyNumber, F_OIDEQ,
     297              :                 ObjectIdGetDatum(pubid));
     298              : 
     299          215 :     scan = systable_beginscan(pubschsrel,
     300              :                               PublicationNamespacePnnspidPnpubidIndexId,
     301              :                               true, NULL, 1, &scankey);
     302          215 :     tup = systable_getnext(scan);
     303          215 :     result = HeapTupleIsValid(tup);
     304              : 
     305          215 :     systable_endscan(scan);
     306          215 :     table_close(pubschsrel, AccessShareLock);
     307              : 
     308          215 :     return result;
     309              : }
     310              : 
     311              : /*
     312              :  * Returns true if the publication has explicitly included relation (i.e.,
     313              :  * not marked as EXCEPT).
     314              :  */
     315              : bool
     316           49 : is_table_publication(Oid pubid)
     317              : {
     318              :     Relation    pubrelsrel;
     319              :     ScanKeyData scankey;
     320              :     SysScanDesc scan;
     321              :     HeapTuple   tup;
     322           49 :     bool        result = false;
     323              : 
     324           49 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     325           49 :     ScanKeyInit(&scankey,
     326              :                 Anum_pg_publication_rel_prpubid,
     327              :                 BTEqualStrategyNumber, F_OIDEQ,
     328              :                 ObjectIdGetDatum(pubid));
     329              : 
     330           49 :     scan = systable_beginscan(pubrelsrel,
     331              :                               PublicationRelPrpubidIndexId,
     332              :                               true, NULL, 1, &scankey);
     333           49 :     tup = systable_getnext(scan);
     334           49 :     if (HeapTupleIsValid(tup))
     335              :     {
     336              :         Form_pg_publication_rel pubrel;
     337              : 
     338           25 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     339              : 
     340              :         /*
     341              :          * For any publication, pg_publication_rel contains either only EXCEPT
     342              :          * entries or only explicitly included tables. Therefore, examining
     343              :          * the first tuple is sufficient to determine table inclusion.
     344              :          */
     345           25 :         result = !pubrel->prexcept;
     346              :     }
     347              : 
     348           49 :     systable_endscan(scan);
     349           49 :     table_close(pubrelsrel, AccessShareLock);
     350              : 
     351           49 :     return result;
     352              : }
     353              : 
     354              : /*
     355              :  * Returns true if the relation has column list associated with the
     356              :  * publication, false otherwise.
     357              :  *
     358              :  * If a column list is found, the corresponding bitmap is returned through the
     359              :  * cols parameter, if provided. The bitmap is constructed within the given
     360              :  * memory context (mcxt).
     361              :  */
     362              : bool
     363          924 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
     364              :                             Bitmapset **cols)
     365              : {
     366              :     HeapTuple   cftuple;
     367          924 :     bool        found = false;
     368              : 
     369          924 :     if (pub->alltables)
     370          221 :         return false;
     371              : 
     372          703 :     cftuple = SearchSysCache2(PUBLICATIONRELMAP,
     373              :                               ObjectIdGetDatum(relid),
     374              :                               ObjectIdGetDatum(pub->oid));
     375          703 :     if (HeapTupleIsValid(cftuple))
     376              :     {
     377              :         Datum       cfdatum;
     378              :         bool        isnull;
     379              : 
     380              :         /* Lookup the column list attribute. */
     381          645 :         cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
     382              :                                   Anum_pg_publication_rel_prattrs, &isnull);
     383              : 
     384              :         /* Was a column list found? */
     385          645 :         if (!isnull)
     386              :         {
     387              :             /* Build the column list bitmap in the given memory context. */
     388          193 :             if (cols)
     389          190 :                 *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
     390              : 
     391          193 :             found = true;
     392              :         }
     393              : 
     394          645 :         ReleaseSysCache(cftuple);
     395              :     }
     396              : 
     397          703 :     return found;
     398              : }
     399              : 
     400              : /*
     401              :  * Gets the relations based on the publication partition option for a specified
     402              :  * relation.
     403              :  */
     404              : List *
     405         2498 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
     406              :                                Oid relid)
     407              : {
     408         2498 :     if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
     409              :         pub_partopt != PUBLICATION_PART_ROOT)
     410          701 :     {
     411          701 :         List       *all_parts = find_all_inheritors(relid, NoLock,
     412              :                                                     NULL);
     413              : 
     414          701 :         if (pub_partopt == PUBLICATION_PART_ALL)
     415          682 :             result = list_concat(result, all_parts);
     416           19 :         else if (pub_partopt == PUBLICATION_PART_LEAF)
     417              :         {
     418              :             ListCell   *lc;
     419              : 
     420           69 :             foreach(lc, all_parts)
     421              :             {
     422           50 :                 Oid         partOid = lfirst_oid(lc);
     423              : 
     424           50 :                 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
     425           30 :                     result = lappend_oid(result, partOid);
     426              :             }
     427              :         }
     428              :         else
     429              :             Assert(false);
     430              :     }
     431              :     else
     432         1797 :         result = lappend_oid(result, relid);
     433              : 
     434         2498 :     return result;
     435              : }
     436              : 
     437              : /*
     438              :  * Returns the relid of the topmost ancestor that is published via this
     439              :  * publication if any and set its ancestor level to ancestor_level,
     440              :  * otherwise returns InvalidOid.
     441              :  *
     442              :  * The ancestor_level value allows us to compare the results for multiple
     443              :  * publications, and decide which value is higher up.
     444              :  *
     445              :  * Note that the list of ancestors should be ordered such that the topmost
     446              :  * ancestor is at the end of the list.
     447              :  */
     448              : Oid
     449          426 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
     450              : {
     451              :     ListCell   *lc;
     452          426 :     Oid         topmost_relid = InvalidOid;
     453          426 :     int         level = 0;
     454              : 
     455              :     /*
     456              :      * Find the "topmost" ancestor that is in this publication.
     457              :      */
     458          862 :     foreach(lc, ancestors)
     459              :     {
     460          436 :         Oid         ancestor = lfirst_oid(lc);
     461          436 :         List       *apubids = GetRelationIncludedPublications(ancestor);
     462          436 :         List       *aschemaPubids = NIL;
     463              : 
     464          436 :         level++;
     465              : 
     466          436 :         if (list_member_oid(apubids, puboid))
     467              :         {
     468          223 :             topmost_relid = ancestor;
     469              : 
     470          223 :             if (ancestor_level)
     471           43 :                 *ancestor_level = level;
     472              :         }
     473              :         else
     474              :         {
     475          213 :             aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
     476          213 :             if (list_member_oid(aschemaPubids, puboid))
     477              :             {
     478           13 :                 topmost_relid = ancestor;
     479              : 
     480           13 :                 if (ancestor_level)
     481            5 :                     *ancestor_level = level;
     482              :             }
     483              :         }
     484              : 
     485          436 :         list_free(apubids);
     486          436 :         list_free(aschemaPubids);
     487              :     }
     488              : 
     489          426 :     return topmost_relid;
     490              : }
     491              : 
     492              : /*
     493              :  * attnumstoint2vector
     494              :  *      Convert a Bitmapset of AttrNumbers into an int2vector.
     495              :  *
     496              :  * AttrNumber numbers are 0-based, i.e., not offset by
     497              :  * FirstLowInvalidHeapAttributeNumber.
     498              :  */
     499              : static int2vector *
     500          212 : attnumstoint2vector(Bitmapset *attrs)
     501              : {
     502              :     int2vector *result;
     503          212 :     int         n = bms_num_members(attrs);
     504          212 :     int         i = -1;
     505          212 :     int         j = 0;
     506              : 
     507          212 :     result = buildint2vector(NULL, n);
     508              : 
     509          577 :     while ((i = bms_next_member(attrs, i)) >= 0)
     510              :     {
     511              :         Assert(i <= PG_INT16_MAX);
     512              : 
     513          365 :         result->values[j++] = (int16) i;
     514              :     }
     515              : 
     516          212 :     return result;
     517              : }
     518              : 
     519              : /*
     520              :  * Insert new publication / relation mapping.
     521              :  */
     522              : ObjectAddress
     523          830 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
     524              :                          bool if_not_exists, AlterPublicationStmt *alter_stmt)
     525              : {
     526              :     Relation    rel;
     527              :     HeapTuple   tup;
     528              :     Datum       values[Natts_pg_publication_rel];
     529              :     bool        nulls[Natts_pg_publication_rel];
     530          830 :     Relation    targetrel = pri->relation;
     531          830 :     Oid         relid = RelationGetRelid(targetrel);
     532              :     Oid         pubreloid;
     533              :     Bitmapset  *attnums;
     534          830 :     Publication *pub = GetPublication(pubid);
     535              :     ObjectAddress myself,
     536              :                 referenced;
     537          830 :     List       *relids = NIL;
     538              :     int         i;
     539              :     bool        inval_except_table;
     540              : 
     541          830 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     542              : 
     543              :     /*
     544              :      * Check for duplicates. Note that this does not really prevent
     545              :      * duplicates, it's here just to provide nicer error message in common
     546              :      * case. The real protection is the unique key on the catalog.
     547              :      */
     548          830 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     549              :                               ObjectIdGetDatum(pubid)))
     550              :     {
     551           22 :         table_close(rel, RowExclusiveLock);
     552              : 
     553           22 :         if (if_not_exists)
     554           18 :             return InvalidObjectAddress;
     555              : 
     556            4 :         ereport(ERROR,
     557              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     558              :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     559              :                         RelationGetRelationName(targetrel), pub->name)));
     560              :     }
     561              : 
     562          808 :     check_publication_add_relation(pri);
     563              : 
     564              :     /* Validate and translate column names into a Bitmapset of attnums. */
     565          783 :     attnums = pub_collist_validate(pri->relation, pri->columns);
     566              : 
     567              :     /* Form a tuple. */
     568          767 :     memset(values, 0, sizeof(values));
     569          767 :     memset(nulls, false, sizeof(nulls));
     570              : 
     571          767 :     pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     572              :                                    Anum_pg_publication_rel_oid);
     573          767 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
     574          767 :     values[Anum_pg_publication_rel_prpubid - 1] =
     575          767 :         ObjectIdGetDatum(pubid);
     576          767 :     values[Anum_pg_publication_rel_prrelid - 1] =
     577          767 :         ObjectIdGetDatum(relid);
     578          767 :     values[Anum_pg_publication_rel_prexcept - 1] =
     579          767 :         BoolGetDatum(pri->except);
     580              : 
     581              :     /* Add qualifications, if available */
     582          767 :     if (pri->whereClause != NULL)
     583          219 :         values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
     584              :     else
     585          548 :         nulls[Anum_pg_publication_rel_prqual - 1] = true;
     586              : 
     587              :     /* Add column list, if available */
     588          767 :     if (pri->columns)
     589          212 :         values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
     590              :     else
     591          555 :         nulls[Anum_pg_publication_rel_prattrs - 1] = true;
     592              : 
     593          767 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     594              : 
     595              :     /* Insert tuple into catalog. */
     596          767 :     CatalogTupleInsert(rel, tup);
     597          767 :     heap_freetuple(tup);
     598              : 
     599              :     /* Register dependencies as needed */
     600          767 :     ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
     601              : 
     602              :     /* Add dependency on the publication */
     603          767 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     604          767 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     605              : 
     606              :     /* Add dependency on the relation */
     607          767 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     608          767 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     609              : 
     610              :     /* Add dependency on the objects mentioned in the qualifications */
     611          767 :     if (pri->whereClause)
     612          219 :         recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
     613              :                                         DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
     614              :                                         false);
     615              : 
     616              :     /* Add dependency on the columns, if any are listed */
     617          767 :     i = -1;
     618         1132 :     while ((i = bms_next_member(attnums, i)) >= 0)
     619              :     {
     620          365 :         ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
     621          365 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     622              :     }
     623              : 
     624              :     /* Close the table. */
     625          767 :     table_close(rel, RowExclusiveLock);
     626              : 
     627              :     /*
     628              :      * Determine whether EXCEPT tables require explicit relcache invalidation.
     629              :      *
     630              :      * For CREATE PUBLICATION with EXCEPT tables, invalidation is skipped
     631              :      * here, as CreatePublication() function invalidates all relations as part
     632              :      * of defining a FOR ALL TABLES publication.
     633              :      *
     634              :      * For ALTER PUBLICATION, invalidation is needed only when adding an
     635              :      * EXCEPT table to a publication already marked as ALL TABLES. For
     636              :      * publications that were originally empty or defined as ALL SEQUENCES and
     637              :      * are being converted to ALL TABLES, invalidation is skipped here, as
     638              :      * AlterPublicationAllFlags() function invalidates all relations while
     639              :      * marking the publication as ALL TABLES publication.
     640              :      */
     641          776 :     inval_except_table = (alter_stmt != NULL) && pub->alltables &&
     642            9 :         (alter_stmt->for_all_tables && pri->except);
     643              : 
     644          767 :     if (!pri->except || inval_except_table)
     645              :     {
     646              :         /*
     647              :          * Invalidate relcache so that publication info is rebuilt.
     648              :          *
     649              :          * For the partitioned tables, we must invalidate all partitions
     650              :          * contained in the respective partition hierarchies, not just the one
     651              :          * explicitly mentioned in the publication. This is required because
     652              :          * we implicitly publish the child tables when the parent table is
     653              :          * published.
     654              :          */
     655          699 :         relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
     656              :                                                 relid);
     657              : 
     658          699 :         InvalidatePublicationRels(relids);
     659              :     }
     660              : 
     661          767 :     return myself;
     662              : }
     663              : 
     664              : /*
     665              :  * pub_collist_validate
     666              :  *      Process and validate the 'columns' list and ensure the columns are all
     667              :  *      valid to use for a publication.  Checks for and raises an ERROR for
     668              :  *      any unknown columns, system columns, duplicate columns, or virtual
     669              :  *      generated columns.
     670              :  *
     671              :  * Looks up each column's attnum and returns a 0-based Bitmapset of the
     672              :  * corresponding attnums.
     673              :  */
     674              : Bitmapset *
     675         1073 : pub_collist_validate(Relation targetrel, List *columns)
     676              : {
     677         1073 :     Bitmapset  *set = NULL;
     678              :     ListCell   *lc;
     679         1073 :     TupleDesc   tupdesc = RelationGetDescr(targetrel);
     680              : 
     681         1618 :     foreach(lc, columns)
     682              :     {
     683          569 :         char       *colname = strVal(lfirst(lc));
     684          569 :         AttrNumber  attnum = get_attnum(RelationGetRelid(targetrel), colname);
     685              : 
     686          569 :         if (attnum == InvalidAttrNumber)
     687            4 :             ereport(ERROR,
     688              :                     errcode(ERRCODE_UNDEFINED_COLUMN),
     689              :                     errmsg("column \"%s\" of relation \"%s\" does not exist",
     690              :                            colname, RelationGetRelationName(targetrel)));
     691              : 
     692          565 :         if (!AttrNumberIsForUserDefinedAttr(attnum))
     693            8 :             ereport(ERROR,
     694              :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     695              :                     errmsg("cannot use system column \"%s\" in publication column list",
     696              :                            colname));
     697              : 
     698          557 :         if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     699            4 :             ereport(ERROR,
     700              :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     701              :                     errmsg("cannot use virtual generated column \"%s\" in publication column list",
     702              :                            colname));
     703              : 
     704          553 :         if (bms_is_member(attnum, set))
     705            8 :             ereport(ERROR,
     706              :                     errcode(ERRCODE_DUPLICATE_OBJECT),
     707              :                     errmsg("duplicate column \"%s\" in publication column list",
     708              :                            colname));
     709              : 
     710          545 :         set = bms_add_member(set, attnum);
     711              :     }
     712              : 
     713         1049 :     return set;
     714              : }
     715              : 
     716              : /*
     717              :  * Transform a column list (represented by an array Datum) to a bitmapset.
     718              :  *
     719              :  * If columns isn't NULL, add the column numbers to that set.
     720              :  *
     721              :  * If mcxt isn't NULL, build the bitmapset in that context.
     722              :  */
     723              : Bitmapset *
     724          280 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
     725              : {
     726          280 :     Bitmapset  *result = columns;
     727              :     ArrayType  *arr;
     728              :     int         nelems;
     729              :     int16      *elems;
     730          280 :     MemoryContext oldcxt = NULL;
     731              : 
     732          280 :     arr = DatumGetArrayTypeP(pubcols);
     733          280 :     nelems = ARR_DIMS(arr)[0];
     734          280 :     elems = (int16 *) ARR_DATA_PTR(arr);
     735              : 
     736              :     /* If a memory context was specified, switch to it. */
     737          280 :     if (mcxt)
     738           41 :         oldcxt = MemoryContextSwitchTo(mcxt);
     739              : 
     740          770 :     for (int i = 0; i < nelems; i++)
     741          490 :         result = bms_add_member(result, elems[i]);
     742              : 
     743          280 :     if (mcxt)
     744           41 :         MemoryContextSwitchTo(oldcxt);
     745              : 
     746          280 :     return result;
     747              : }
     748              : 
     749              : /*
     750              :  * Returns a bitmap representing the columns of the specified table.
     751              :  *
     752              :  * Generated columns are included if include_gencols_type is
     753              :  * PUBLISH_GENCOLS_STORED.
     754              :  */
     755              : Bitmapset *
     756            9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
     757              : {
     758            9 :     Bitmapset  *result = NULL;
     759            9 :     TupleDesc   desc = RelationGetDescr(relation);
     760              : 
     761           30 :     for (int i = 0; i < desc->natts; i++)
     762              :     {
     763           21 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     764              : 
     765           21 :         if (att->attisdropped)
     766            1 :             continue;
     767              : 
     768           20 :         if (att->attgenerated)
     769              :         {
     770              :             /* We only support replication of STORED generated cols. */
     771            2 :             if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
     772            1 :                 continue;
     773              : 
     774              :             /* User hasn't requested to replicate STORED generated cols. */
     775            1 :             if (include_gencols_type != PUBLISH_GENCOLS_STORED)
     776            1 :                 continue;
     777              :         }
     778              : 
     779           18 :         result = bms_add_member(result, att->attnum);
     780              :     }
     781              : 
     782            9 :     return result;
     783              : }
     784              : 
     785              : /*
     786              :  * Insert new publication / schema mapping.
     787              :  */
     788              : ObjectAddress
     789          175 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
     790              : {
     791              :     Relation    rel;
     792              :     HeapTuple   tup;
     793              :     Datum       values[Natts_pg_publication_namespace];
     794              :     bool        nulls[Natts_pg_publication_namespace];
     795              :     Oid         psschid;
     796          175 :     Publication *pub = GetPublication(pubid);
     797          175 :     List       *schemaRels = NIL;
     798              :     ObjectAddress myself,
     799              :                 referenced;
     800              : 
     801          175 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
     802              : 
     803              :     /*
     804              :      * Check for duplicates. Note that this does not really prevent
     805              :      * duplicates, it's here just to provide nicer error message in common
     806              :      * case. The real protection is the unique key on the catalog.
     807              :      */
     808          175 :     if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     809              :                               ObjectIdGetDatum(schemaid),
     810              :                               ObjectIdGetDatum(pubid)))
     811              :     {
     812           12 :         table_close(rel, RowExclusiveLock);
     813              : 
     814           12 :         if (if_not_exists)
     815            8 :             return InvalidObjectAddress;
     816              : 
     817            4 :         ereport(ERROR,
     818              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     819              :                  errmsg("schema \"%s\" is already member of publication \"%s\"",
     820              :                         get_namespace_name(schemaid), pub->name)));
     821              :     }
     822              : 
     823          163 :     check_publication_add_schema(schemaid);
     824              : 
     825              :     /* Form a tuple */
     826          159 :     memset(values, 0, sizeof(values));
     827          159 :     memset(nulls, false, sizeof(nulls));
     828              : 
     829          159 :     psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
     830              :                                  Anum_pg_publication_namespace_oid);
     831          159 :     values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
     832          159 :     values[Anum_pg_publication_namespace_pnpubid - 1] =
     833          159 :         ObjectIdGetDatum(pubid);
     834          159 :     values[Anum_pg_publication_namespace_pnnspid - 1] =
     835          159 :         ObjectIdGetDatum(schemaid);
     836              : 
     837          159 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     838              : 
     839              :     /* Insert tuple into catalog */
     840          159 :     CatalogTupleInsert(rel, tup);
     841          159 :     heap_freetuple(tup);
     842              : 
     843          159 :     ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
     844              : 
     845              :     /* Add dependency on the publication */
     846          159 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     847          159 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     848              : 
     849              :     /* Add dependency on the schema */
     850          159 :     ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
     851          159 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     852              : 
     853              :     /* Close the table */
     854          159 :     table_close(rel, RowExclusiveLock);
     855              : 
     856              :     /*
     857              :      * Invalidate relcache so that publication info is rebuilt. See
     858              :      * publication_add_relation for why we need to consider all the
     859              :      * partitions.
     860              :      */
     861          159 :     schemaRels = GetSchemaPublicationRelations(schemaid,
     862              :                                                PUBLICATION_PART_ALL);
     863          159 :     InvalidatePublicationRels(schemaRels);
     864              : 
     865          159 :     return myself;
     866              : }
     867              : 
     868              : /*
     869              :  * Internal function to get the list of publication oids for a relation.
     870              :  *
     871              :  * If except_flag is true, returns the list of publication that specified the
     872              :  * relation in the EXCEPT clause; otherwise, returns the list of publications
     873              :  * in which relation is included.
     874              :  */
     875              : static List *
     876        16710 : get_relation_publications(Oid relid, bool except_flag)
     877              : {
     878        16710 :     List       *result = NIL;
     879              :     CatCList   *pubrellist;
     880              : 
     881              :     /* Find all publications associated with the relation. */
     882        16710 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     883              :                                      ObjectIdGetDatum(relid));
     884        18350 :     for (int i = 0; i < pubrellist->n_members; i++)
     885              :     {
     886         1640 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     887         1640 :         Form_pg_publication_rel pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     888         1640 :         Oid         pubid = pubrel->prpubid;
     889              : 
     890         1640 :         if (pubrel->prexcept == except_flag)
     891         1169 :             result = lappend_oid(result, pubid);
     892              :     }
     893              : 
     894        16710 :     ReleaseSysCacheList(pubrellist);
     895              : 
     896        16710 :     return result;
     897              : }
     898              : 
     899              : /*
     900              :  * Gets list of publication oids for a relation.
     901              :  */
     902              : List *
     903         9004 : GetRelationIncludedPublications(Oid relid)
     904              : {
     905         9004 :     return get_relation_publications(relid, false);
     906              : }
     907              : 
     908              : /*
     909              :  * Gets list of publication oids which has relation in the EXCEPT clause.
     910              :  */
     911              : List *
     912         7706 : GetRelationExcludedPublications(Oid relid)
     913              : {
     914         7706 :     return get_relation_publications(relid, true);
     915              : }
     916              : 
     917              : /*
     918              :  * Internal function to get the list of relation oids for a publication.
     919              :  *
     920              :  * If except_flag is true, returns the list of relations specified in the
     921              :  * EXCEPT clause of the publication; otherwise, returns the list of relations
     922              :  * included in the publication.
     923              :  */
     924              : static List *
     925          680 : get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt,
     926              :                           bool except_flag)
     927              : {
     928              :     List       *result;
     929              :     Relation    pubrelsrel;
     930              :     ScanKeyData scankey;
     931              :     SysScanDesc scan;
     932              :     HeapTuple   tup;
     933              : 
     934              :     /* Find all relations associated with the publication. */
     935          680 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     936              : 
     937          680 :     ScanKeyInit(&scankey,
     938              :                 Anum_pg_publication_rel_prpubid,
     939              :                 BTEqualStrategyNumber, F_OIDEQ,
     940              :                 ObjectIdGetDatum(pubid));
     941              : 
     942          680 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
     943              :                               true, NULL, 1, &scankey);
     944              : 
     945          680 :     result = NIL;
     946         2000 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     947              :     {
     948              :         Form_pg_publication_rel pubrel;
     949              : 
     950          640 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     951              : 
     952          640 :         if (except_flag == pubrel->prexcept)
     953          640 :             result = GetPubPartitionOptionRelations(result, pub_partopt,
     954              :                                                     pubrel->prrelid);
     955              :     }
     956              : 
     957          680 :     systable_endscan(scan);
     958          680 :     table_close(pubrelsrel, AccessShareLock);
     959              : 
     960              :     /* Now sort and de-duplicate the result list */
     961          680 :     list_sort(result, list_oid_cmp);
     962          680 :     list_deduplicate_oid(result);
     963              : 
     964          680 :     return result;
     965              : }
     966              : 
     967              : /*
     968              :  * Gets list of relation oids that are associated with a publication.
     969              :  *
     970              :  * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
     971              :  * should use GetAllPublicationRelations().
     972              :  */
     973              : List *
     974          610 : GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     975              : {
     976              :     Assert(!GetPublication(pubid)->alltables);
     977              : 
     978          610 :     return get_publication_relations(pubid, pub_partopt, false);
     979              : }
     980              : 
     981              : /*
     982              :  * Gets list of table oids that were specified in the EXCEPT clause for a
     983              :  * publication.
     984              :  *
     985              :  * This should only be used FOR ALL TABLES publications.
     986              :  */
     987              : List *
     988           70 : GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
     989              : {
     990              :     Assert(GetPublication(pubid)->alltables);
     991              : 
     992           70 :     return get_publication_relations(pubid, pub_partopt, true);
     993              : }
     994              : 
     995              : /*
     996              :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     997              :  */
     998              : List *
     999         6008 : GetAllTablesPublications(void)
    1000              : {
    1001              :     List       *result;
    1002              :     Relation    rel;
    1003              :     ScanKeyData scankey;
    1004              :     SysScanDesc scan;
    1005              :     HeapTuple   tup;
    1006              : 
    1007              :     /* Find all publications that are marked as for all tables. */
    1008         6008 :     rel = table_open(PublicationRelationId, AccessShareLock);
    1009              : 
    1010         6008 :     ScanKeyInit(&scankey,
    1011              :                 Anum_pg_publication_puballtables,
    1012              :                 BTEqualStrategyNumber, F_BOOLEQ,
    1013              :                 BoolGetDatum(true));
    1014              : 
    1015         6008 :     scan = systable_beginscan(rel, InvalidOid, false,
    1016              :                               NULL, 1, &scankey);
    1017              : 
    1018         6008 :     result = NIL;
    1019         6138 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
    1020              :     {
    1021          130 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
    1022              : 
    1023          130 :         result = lappend_oid(result, oid);
    1024              :     }
    1025              : 
    1026         6008 :     systable_endscan(scan);
    1027         6008 :     table_close(rel, AccessShareLock);
    1028              : 
    1029         6008 :     return result;
    1030              : }
    1031              : 
    1032              : /*
    1033              :  * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
    1034              :  * publication.
    1035              :  *
    1036              :  * If the publication publishes partition changes via their respective root
    1037              :  * partitioned tables, we must exclude partitions in favor of including the
    1038              :  * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
    1039              :  * publication.
    1040              :  *
    1041              :  * For a FOR ALL TABLES publication, the returned list excludes tables mentioned
    1042              :  * in the EXCEPT clause.
    1043              :  */
    1044              : List *
    1045           55 : GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
    1046              : {
    1047              :     Relation    classRel;
    1048              :     ScanKeyData key[1];
    1049              :     TableScanDesc scan;
    1050              :     HeapTuple   tuple;
    1051           55 :     List       *result = NIL;
    1052           55 :     List       *exceptlist = NIL;
    1053              : 
    1054              :     Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
    1055              : 
    1056              :     /* EXCEPT filtering applies only to relations, not sequences */
    1057           55 :     if (relkind == RELKIND_RELATION)
    1058           49 :         exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ?
    1059           49 :                                                   PUBLICATION_PART_ROOT :
    1060              :                                                   PUBLICATION_PART_LEAF);
    1061              : 
    1062           55 :     classRel = table_open(RelationRelationId, AccessShareLock);
    1063              : 
    1064           55 :     ScanKeyInit(&key[0],
    1065              :                 Anum_pg_class_relkind,
    1066              :                 BTEqualStrategyNumber, F_CHAREQ,
    1067              :                 CharGetDatum(relkind));
    1068              : 
    1069           55 :     scan = table_beginscan_catalog(classRel, 1, key);
    1070              : 
    1071         3769 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    1072              :     {
    1073         3714 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
    1074         3714 :         Oid         relid = relForm->oid;
    1075              : 
    1076         3714 :         if (is_publishable_class(relid, relForm) &&
    1077          135 :             !(relForm->relispartition && pubviaroot) &&
    1078          112 :             !list_member_oid(exceptlist, relid))
    1079          104 :             result = lappend_oid(result, relid);
    1080              :     }
    1081              : 
    1082           55 :     table_endscan(scan);
    1083              : 
    1084           55 :     if (pubviaroot)
    1085              :     {
    1086            4 :         ScanKeyInit(&key[0],
    1087              :                     Anum_pg_class_relkind,
    1088              :                     BTEqualStrategyNumber, F_CHAREQ,
    1089              :                     CharGetDatum(RELKIND_PARTITIONED_TABLE));
    1090              : 
    1091            4 :         scan = table_beginscan_catalog(classRel, 1, key);
    1092              : 
    1093           21 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    1094              :         {
    1095           17 :             Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
    1096           17 :             Oid         relid = relForm->oid;
    1097              : 
    1098           17 :             if (is_publishable_class(relid, relForm) &&
    1099           17 :                 !relForm->relispartition &&
    1100           13 :                 !list_member_oid(exceptlist, relid))
    1101           12 :                 result = lappend_oid(result, relid);
    1102              :         }
    1103              : 
    1104            4 :         table_endscan(scan);
    1105              :     }
    1106              : 
    1107           55 :     table_close(classRel, AccessShareLock);
    1108           55 :     return result;
    1109              : }
    1110              : 
    1111              : /*
    1112              :  * Gets the list of schema oids for a publication.
    1113              :  *
    1114              :  * This should only be used FOR TABLES IN SCHEMA publications.
    1115              :  */
    1116              : List *
    1117          576 : GetPublicationSchemas(Oid pubid)
    1118              : {
    1119          576 :     List       *result = NIL;
    1120              :     Relation    pubschsrel;
    1121              :     ScanKeyData scankey;
    1122              :     SysScanDesc scan;
    1123              :     HeapTuple   tup;
    1124              : 
    1125              :     /* Find all schemas associated with the publication */
    1126          576 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
    1127              : 
    1128          576 :     ScanKeyInit(&scankey,
    1129              :                 Anum_pg_publication_namespace_pnpubid,
    1130              :                 BTEqualStrategyNumber, F_OIDEQ,
    1131              :                 ObjectIdGetDatum(pubid));
    1132              : 
    1133          576 :     scan = systable_beginscan(pubschsrel,
    1134              :                               PublicationNamespacePnnspidPnpubidIndexId,
    1135              :                               true, NULL, 1, &scankey);
    1136          616 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
    1137              :     {
    1138              :         Form_pg_publication_namespace pubsch;
    1139              : 
    1140           40 :         pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1141              : 
    1142           40 :         result = lappend_oid(result, pubsch->pnnspid);
    1143              :     }
    1144              : 
    1145          576 :     systable_endscan(scan);
    1146          576 :     table_close(pubschsrel, AccessShareLock);
    1147              : 
    1148          576 :     return result;
    1149              : }
    1150              : 
    1151              : /*
    1152              :  * Gets the list of publication oids associated with a specified schema.
    1153              :  */
    1154              : List *
    1155         8672 : GetSchemaPublications(Oid schemaid)
    1156              : {
    1157         8672 :     List       *result = NIL;
    1158              :     CatCList   *pubschlist;
    1159              :     int         i;
    1160              : 
    1161              :     /* Find all publications associated with the schema */
    1162         8672 :     pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
    1163              :                                      ObjectIdGetDatum(schemaid));
    1164         8752 :     for (i = 0; i < pubschlist->n_members; i++)
    1165              :     {
    1166           80 :         HeapTuple   tup = &pubschlist->members[i]->tuple;
    1167           80 :         Oid         pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
    1168              : 
    1169           80 :         result = lappend_oid(result, pubid);
    1170              :     }
    1171              : 
    1172         8672 :     ReleaseSysCacheList(pubschlist);
    1173              : 
    1174         8672 :     return result;
    1175              : }
    1176              : 
    1177              : /*
    1178              :  * Get the list of publishable relation oids for a specified schema.
    1179              :  */
    1180              : List *
    1181          310 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
    1182              : {
    1183              :     Relation    classRel;
    1184              :     ScanKeyData key[1];
    1185              :     TableScanDesc scan;
    1186              :     HeapTuple   tuple;
    1187          310 :     List       *result = NIL;
    1188              : 
    1189              :     Assert(OidIsValid(schemaid));
    1190              : 
    1191          310 :     classRel = table_open(RelationRelationId, AccessShareLock);
    1192              : 
    1193          310 :     ScanKeyInit(&key[0],
    1194              :                 Anum_pg_class_relnamespace,
    1195              :                 BTEqualStrategyNumber, F_OIDEQ,
    1196              :                 ObjectIdGetDatum(schemaid));
    1197              : 
    1198              :     /* get all the relations present in the specified schema */
    1199          310 :     scan = table_beginscan_catalog(classRel, 1, key);
    1200        17256 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    1201              :     {
    1202        16946 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
    1203        16946 :         Oid         relid = relForm->oid;
    1204              :         char        relkind;
    1205              : 
    1206        16946 :         if (!is_publishable_class(relid, relForm))
    1207         5839 :             continue;
    1208              : 
    1209        11107 :         relkind = get_rel_relkind(relid);
    1210        11107 :         if (relkind == RELKIND_RELATION)
    1211         9554 :             result = lappend_oid(result, relid);
    1212         1553 :         else if (relkind == RELKIND_PARTITIONED_TABLE)
    1213              :         {
    1214          495 :             List       *partitionrels = NIL;
    1215              : 
    1216              :             /*
    1217              :              * It is quite possible that some of the partitions are in a
    1218              :              * different schema than the parent table, so we need to get such
    1219              :              * partitions separately.
    1220              :              */
    1221          495 :             partitionrels = GetPubPartitionOptionRelations(partitionrels,
    1222              :                                                            pub_partopt,
    1223              :                                                            relForm->oid);
    1224          495 :             result = list_concat_unique_oid(result, partitionrels);
    1225              :         }
    1226              :     }
    1227              : 
    1228          310 :     table_endscan(scan);
    1229          310 :     table_close(classRel, AccessShareLock);
    1230          310 :     return result;
    1231              : }
    1232              : 
    1233              : /*
    1234              :  * Gets the list of all relations published by FOR TABLES IN SCHEMA
    1235              :  * publication.
    1236              :  */
    1237              : List *
    1238          283 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
    1239              : {
    1240          283 :     List       *result = NIL;
    1241          283 :     List       *pubschemalist = GetPublicationSchemas(pubid);
    1242              :     ListCell   *cell;
    1243              : 
    1244          303 :     foreach(cell, pubschemalist)
    1245              :     {
    1246           20 :         Oid         schemaid = lfirst_oid(cell);
    1247           20 :         List       *schemaRels = NIL;
    1248              : 
    1249           20 :         schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
    1250           20 :         result = list_concat(result, schemaRels);
    1251              :     }
    1252              : 
    1253          283 :     return result;
    1254              : }
    1255              : 
    1256              : /*
    1257              :  * Get publication using oid
    1258              :  *
    1259              :  * The Publication struct and its data are palloc'ed here.
    1260              :  */
    1261              : Publication *
    1262         4199 : GetPublication(Oid pubid)
    1263              : {
    1264              :     HeapTuple   tup;
    1265              :     Publication *pub;
    1266              :     Form_pg_publication pubform;
    1267              : 
    1268         4199 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1269         4199 :     if (!HeapTupleIsValid(tup))
    1270            0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1271              : 
    1272         4199 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1273              : 
    1274         4199 :     pub = palloc_object(Publication);
    1275         4199 :     pub->oid = pubid;
    1276         4199 :     pub->name = pstrdup(NameStr(pubform->pubname));
    1277         4199 :     pub->alltables = pubform->puballtables;
    1278         4199 :     pub->allsequences = pubform->puballsequences;
    1279         4199 :     pub->pubactions.pubinsert = pubform->pubinsert;
    1280         4199 :     pub->pubactions.pubupdate = pubform->pubupdate;
    1281         4199 :     pub->pubactions.pubdelete = pubform->pubdelete;
    1282         4199 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
    1283         4199 :     pub->pubviaroot = pubform->pubviaroot;
    1284         4199 :     pub->pubgencols_type = pubform->pubgencols;
    1285              : 
    1286         4199 :     ReleaseSysCache(tup);
    1287              : 
    1288         4199 :     return pub;
    1289              : }
    1290              : 
    1291              : /*
    1292              :  * Get Publication using name.
    1293              :  */
    1294              : Publication *
    1295         1789 : GetPublicationByName(const char *pubname, bool missing_ok)
    1296              : {
    1297              :     Oid         oid;
    1298              : 
    1299         1789 :     oid = get_publication_oid(pubname, missing_ok);
    1300              : 
    1301         1789 :     return OidIsValid(oid) ? GetPublication(oid) : NULL;
    1302              : }
    1303              : 
    1304              : /*
    1305              :  * A helper function for pg_get_publication_tables() to check whether the
    1306              :  * table with the given relid is published in the specified publication.
    1307              :  *
    1308              :  * This function evaluates the effective published OID based on the
    1309              :  * publish_via_partition_root setting, rather than just checking catalog entries
    1310              :  * (e.g., pg_publication_rel). For instance, when publish_via_partition_root is
    1311              :  * false, it returns false for a parent partitioned table and returns true
    1312              :  * for its leaf partitions, even if the parent is the one explicitly added
    1313              :  * to the publication.
    1314              :  *
    1315              :  * For performance reasons, this function avoids the overhead of constructing
    1316              :  * the complete list of published tables during the evaluation. It can execute
    1317              :  * quickly even when the publication contains a large number of relations.
    1318              :  *
    1319              :  * Note: this leaks memory for the ancestors list into the current memory
    1320              :  * context.
    1321              :  */
    1322              : static bool
    1323          990 : is_table_publishable_in_publication(Oid relid, Publication *pub)
    1324              : {
    1325              :     bool        relispartition;
    1326          990 :     List       *ancestors = NIL;
    1327              : 
    1328              :     /*
    1329              :      * For non-pubviaroot publications, a partitioned table is never the
    1330              :      * effective published OID; only its leaf partitions can be.
    1331              :      */
    1332          990 :     if (!pub->pubviaroot && get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE)
    1333           86 :         return false;
    1334              : 
    1335          904 :     relispartition = get_rel_relispartition(relid);
    1336              : 
    1337          904 :     if (relispartition)
    1338          170 :         ancestors = get_partition_ancestors(relid);
    1339              : 
    1340          904 :     if (pub->alltables)
    1341              :     {
    1342              :         /*
    1343              :          * ALL TABLES with pubviaroot includes only regular tables or top-most
    1344              :          * partitioned tables -- never child partitions.
    1345              :          */
    1346          204 :         if (pub->pubviaroot && relispartition)
    1347           12 :             return false;
    1348              : 
    1349              :         /*
    1350              :          * For ALL TABLES publications, the table is published unless it
    1351              :          * appears in the EXCEPT clause. Only the top-most can appear in the
    1352              :          * EXCEPT clause, so exclusion must be evaluated at the top-most
    1353              :          * ancestor if it has. These publications store only EXCEPT'ed tables
    1354              :          * in pg_publication_rel, so checking existence is sufficient.
    1355              :          *
    1356              :          * Note that this existence check below would incorrectly return true
    1357              :          * (published) for partitions when pubviaroot is enabled; however,
    1358              :          * that case is already caught and returned false by the above check.
    1359              :          */
    1360          192 :         return !SearchSysCacheExists2(PUBLICATIONRELMAP,
    1361              :                                       ObjectIdGetDatum(ancestors
    1362              :                                                        ? llast_oid(ancestors) : relid),
    1363              :                                       ObjectIdGetDatum(pub->oid));
    1364              :     }
    1365              : 
    1366              :     /*
    1367              :      * Non-ALL-TABLE publication cases.
    1368              :      *
    1369              :      * A table is published if it (or a containing schema) was explicitly
    1370              :      * added, or if it is a partition whose ancestor was added.
    1371              :      */
    1372              : 
    1373              :     /*
    1374              :      * If an ancestor is published, the partition's status depends on
    1375              :      * publish_via_partition_root value.
    1376              :      *
    1377              :      * If it's true, the ancestor's relation OID is the effective published
    1378              :      * OID, so the partition itself should be excluded (return false).
    1379              :      *
    1380              :      * If it's false, the partition is covered by its ancestor's presence in
    1381              :      * the publication, it should be included (return true).
    1382              :      */
    1383          840 :     if (relispartition &&
    1384          140 :         OidIsValid(GetTopMostAncestorInPublication(pub->oid, ancestors, NULL)))
    1385           44 :         return !pub->pubviaroot;
    1386              : 
    1387              :     /*
    1388              :      * Check whether the table is explicitly published via pg_publication_rel
    1389              :      * or pg_publication_namespace.
    1390              :      */
    1391          656 :     return (SearchSysCacheExists2(PUBLICATIONRELMAP,
    1392              :                                   ObjectIdGetDatum(relid),
    1393         1026 :                                   ObjectIdGetDatum(pub->oid)) ||
    1394          370 :             SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
    1395              :                                   ObjectIdGetDatum(get_rel_namespace(relid)),
    1396              :                                   ObjectIdGetDatum(pub->oid)));
    1397              : }
    1398              : 
    1399              : /*
    1400              :  * Helper function to get information of the tables in the given
    1401              :  * publication(s).
    1402              :  *
    1403              :  * If filter_by_relid is true, only the row(s) for target_relid is returned;
    1404              :  * if target_relid does not exist or is not part of the publications, zero
    1405              :  * rows are returned.  If filter_by_relid is false, rows for all tables
    1406              :  * within the specified publications are returned and target_relid is
    1407              :  * ignored.
    1408              :  *
    1409              :  * Returns pubid, relid, column list, and row filter for each table.
    1410              :  */
    1411              : static Datum
    1412         1608 : pg_get_publication_tables(FunctionCallInfo fcinfo, ArrayType *pubnames,
    1413              :                           Oid target_relid, bool filter_by_relid,
    1414              :                           bool pub_missing_ok)
    1415              : {
    1416              : #define NUM_PUBLICATION_TABLES_ELEM 4
    1417              :     FuncCallContext *funcctx;
    1418         1608 :     List       *table_infos = NIL;
    1419              : 
    1420              :     /* stuff done only on the first call of the function */
    1421         1608 :     if (SRF_IS_FIRSTCALL())
    1422              :     {
    1423              :         TupleDesc   tupdesc;
    1424              :         MemoryContext oldcontext;
    1425              :         Datum      *elems;
    1426              :         int         nelems,
    1427              :                     i;
    1428          758 :         bool        viaroot = false;
    1429              : 
    1430              :         /* create a function context for cross-call persistence */
    1431          758 :         funcctx = SRF_FIRSTCALL_INIT();
    1432              : 
    1433              :         /*
    1434              :          * Preliminary check if the specified table can be published in the
    1435              :          * first place. If not, we can return early without checking the given
    1436              :          * publications and the table.
    1437              :          */
    1438          758 :         if (filter_by_relid && !is_publishable_table(target_relid))
    1439            8 :             SRF_RETURN_DONE(funcctx);
    1440              : 
    1441              :         /* switch to memory context appropriate for multiple function calls */
    1442          750 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1443              : 
    1444              :         /*
    1445              :          * Deconstruct the parameter into elements where each element is a
    1446              :          * publication name.
    1447              :          */
    1448          750 :         deconstruct_array_builtin(pubnames, TEXTOID, &elems, NULL, &nelems);
    1449              : 
    1450              :         /* Get Oids of tables from each publication. */
    1451         1998 :         for (i = 0; i < nelems; i++)
    1452              :         {
    1453              :             Publication *pub_elem;
    1454         1248 :             List       *pub_elem_tables = NIL;
    1455              :             ListCell   *lc;
    1456              : 
    1457         1248 :             pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]),
    1458              :                                             pub_missing_ok);
    1459              : 
    1460         1248 :             if (pub_elem == NULL)
    1461            6 :                 continue;
    1462              : 
    1463         1242 :             if (filter_by_relid)
    1464              :             {
    1465              :                 /* Check if the given table is published for the publication */
    1466          990 :                 if (is_table_publishable_in_publication(target_relid, pub_elem))
    1467              :                 {
    1468          504 :                     pub_elem_tables = list_make1_oid(target_relid);
    1469              :                 }
    1470              :             }
    1471              :             else
    1472              :             {
    1473              :                 /*
    1474              :                  * Publications support partitioned tables. If
    1475              :                  * publish_via_partition_root is false, all changes are
    1476              :                  * replicated using leaf partition identity and schema, so we
    1477              :                  * only need those. Otherwise, get the partitioned table
    1478              :                  * itself.
    1479              :                  */
    1480          252 :                 if (pub_elem->alltables)
    1481           49 :                     pub_elem_tables = GetAllPublicationRelations(pub_elem->oid,
    1482              :                                                                  RELKIND_RELATION,
    1483           49 :                                                                  pub_elem->pubviaroot);
    1484              :                 else
    1485              :                 {
    1486              :                     List       *relids,
    1487              :                                *schemarelids;
    1488              : 
    1489          203 :                     relids = GetIncludedPublicationRelations(pub_elem->oid,
    1490          203 :                                                              pub_elem->pubviaroot ?
    1491          203 :                                                              PUBLICATION_PART_ROOT :
    1492              :                                                              PUBLICATION_PART_LEAF);
    1493          203 :                     schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
    1494          203 :                                                                     pub_elem->pubviaroot ?
    1495          203 :                                                                     PUBLICATION_PART_ROOT :
    1496              :                                                                     PUBLICATION_PART_LEAF);
    1497          203 :                     pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
    1498              :                 }
    1499              :             }
    1500              : 
    1501              :             /*
    1502              :              * Record the published table and the corresponding publication so
    1503              :              * that we can get row filters and column lists later.
    1504              :              *
    1505              :              * When a table is published by multiple publications, to obtain
    1506              :              * all row filters and column lists, the structure related to this
    1507              :              * table will be recorded multiple times.
    1508              :              */
    1509         2122 :             foreach(lc, pub_elem_tables)
    1510              :             {
    1511          880 :                 published_rel *table_info = palloc_object(published_rel);
    1512              : 
    1513          880 :                 table_info->relid = lfirst_oid(lc);
    1514          880 :                 table_info->pubid = pub_elem->oid;
    1515          880 :                 table_infos = lappend(table_infos, table_info);
    1516              :             }
    1517              : 
    1518              :             /* At least one publication is using publish_via_partition_root. */
    1519         1242 :             if (pub_elem->pubviaroot)
    1520          249 :                 viaroot = true;
    1521              :         }
    1522              : 
    1523              :         /*
    1524              :          * If the publication publishes partition changes via their respective
    1525              :          * root partitioned tables, we must exclude partitions in favor of
    1526              :          * including the root partitioned tables. Otherwise, the function
    1527              :          * could return both the child and parent tables which could cause
    1528              :          * data of the child table to be double-published on the subscriber
    1529              :          * side.
    1530              :          */
    1531          750 :         if (viaroot)
    1532          169 :             filter_partitions(table_infos);
    1533              : 
    1534              :         /* Construct a tuple descriptor for the result rows. */
    1535          750 :         tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
    1536          750 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
    1537              :                            OIDOID, -1, 0);
    1538          750 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
    1539              :                            OIDOID, -1, 0);
    1540          750 :         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
    1541              :                            INT2VECTOROID, -1, 0);
    1542          750 :         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
    1543              :                            PG_NODE_TREEOID, -1, 0);
    1544              : 
    1545          750 :         TupleDescFinalize(tupdesc);
    1546          750 :         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
    1547          750 :         funcctx->user_fctx = table_infos;
    1548              : 
    1549          750 :         MemoryContextSwitchTo(oldcontext);
    1550              :     }
    1551              : 
    1552              :     /* stuff done on every call of the function */
    1553         1600 :     funcctx = SRF_PERCALL_SETUP();
    1554         1600 :     table_infos = (List *) funcctx->user_fctx;
    1555              : 
    1556         1600 :     if (funcctx->call_cntr < list_length(table_infos))
    1557              :     {
    1558          850 :         HeapTuple   pubtuple = NULL;
    1559              :         HeapTuple   rettuple;
    1560              :         Publication *pub;
    1561          850 :         published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
    1562          850 :         Oid         relid = table_info->relid;
    1563          850 :         Oid         schemaid = get_rel_namespace(relid);
    1564          850 :         Datum       values[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1565          850 :         bool        nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1566              : 
    1567              :         /*
    1568              :          * Form tuple with appropriate data.
    1569              :          */
    1570              : 
    1571          850 :         pub = GetPublication(table_info->pubid);
    1572              : 
    1573          850 :         values[0] = ObjectIdGetDatum(pub->oid);
    1574          850 :         values[1] = ObjectIdGetDatum(relid);
    1575              : 
    1576              :         /*
    1577              :          * We don't consider row filters or column lists for FOR ALL TABLES or
    1578              :          * FOR TABLES IN SCHEMA publications.
    1579              :          */
    1580          850 :         if (!pub->alltables &&
    1581          577 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
    1582              :                                    ObjectIdGetDatum(schemaid),
    1583              :                                    ObjectIdGetDatum(pub->oid)))
    1584          541 :             pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
    1585              :                                            ObjectIdGetDatum(relid),
    1586              :                                            ObjectIdGetDatum(pub->oid));
    1587              : 
    1588          850 :         if (HeapTupleIsValid(pubtuple))
    1589              :         {
    1590              :             /* Lookup the column list attribute. */
    1591          502 :             values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1592              :                                         Anum_pg_publication_rel_prattrs,
    1593              :                                         &(nulls[2]));
    1594              : 
    1595              :             /* Null indicates no filter. */
    1596          502 :             values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1597              :                                         Anum_pg_publication_rel_prqual,
    1598              :                                         &(nulls[3]));
    1599              :         }
    1600              :         else
    1601              :         {
    1602          348 :             nulls[2] = true;
    1603          348 :             nulls[3] = true;
    1604              :         }
    1605              : 
    1606              :         /* Show all columns when the column list is not specified. */
    1607          850 :         if (nulls[2])
    1608              :         {
    1609          754 :             Relation    rel = table_open(relid, AccessShareLock);
    1610          754 :             int         nattnums = 0;
    1611              :             int16      *attnums;
    1612          754 :             TupleDesc   desc = RelationGetDescr(rel);
    1613              :             int         i;
    1614              : 
    1615          754 :             attnums = palloc_array(int16, desc->natts);
    1616              : 
    1617         1990 :             for (i = 0; i < desc->natts; i++)
    1618              :             {
    1619         1236 :                 Form_pg_attribute att = TupleDescAttr(desc, i);
    1620              : 
    1621         1236 :                 if (att->attisdropped)
    1622            4 :                     continue;
    1623              : 
    1624         1232 :                 if (att->attgenerated)
    1625              :                 {
    1626              :                     /* We only support replication of STORED generated cols. */
    1627           22 :                     if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
    1628           10 :                         continue;
    1629              : 
    1630              :                     /*
    1631              :                      * User hasn't requested to replicate STORED generated
    1632              :                      * cols.
    1633              :                      */
    1634           12 :                     if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
    1635            9 :                         continue;
    1636              :                 }
    1637              : 
    1638         1213 :                 attnums[nattnums++] = att->attnum;
    1639              :             }
    1640              : 
    1641          754 :             if (nattnums > 0)
    1642              :             {
    1643          750 :                 values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
    1644          750 :                 nulls[2] = false;
    1645              :             }
    1646              : 
    1647          754 :             table_close(rel, AccessShareLock);
    1648              :         }
    1649              : 
    1650          850 :         rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
    1651              : 
    1652          850 :         SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
    1653              :     }
    1654              : 
    1655          750 :     SRF_RETURN_DONE(funcctx);
    1656              : }
    1657              : 
    1658              : Datum
    1659          550 : pg_get_publication_tables_a(PG_FUNCTION_ARGS)
    1660              : {
    1661              :     /*
    1662              :      * Get information for all tables in the given publications.
    1663              :      * filter_by_relid is false so all tables are returned; pub_missing_ok is
    1664              :      * false for backward compatibility.
    1665              :      */
    1666          550 :     return pg_get_publication_tables(fcinfo, PG_GETARG_ARRAYTYPE_P(0),
    1667              :                                      InvalidOid, false, false);
    1668              : }
    1669              : 
    1670              : Datum
    1671         1058 : pg_get_publication_tables_b(PG_FUNCTION_ARGS)
    1672              : {
    1673              :     /*
    1674              :      * Get information for the specified table in the given publications. The
    1675              :      * SQL-level function is declared STRICT, so target_relid is guaranteed to
    1676              :      * be non-NULL here.
    1677              :      */
    1678         1058 :     return pg_get_publication_tables(fcinfo, PG_GETARG_ARRAYTYPE_P(0),
    1679              :                                      PG_GETARG_OID(1), true, true);
    1680              : }
    1681              : 
    1682              : /*
    1683              :  * Returns Oids of sequences in a publication.
    1684              :  */
    1685              : Datum
    1686          234 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
    1687              : {
    1688              :     FuncCallContext *funcctx;
    1689          234 :     List       *sequences = NIL;
    1690              : 
    1691              :     /* stuff done only on the first call of the function */
    1692          234 :     if (SRF_IS_FIRSTCALL())
    1693              :     {
    1694          217 :         char       *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1695              :         Publication *publication;
    1696              :         MemoryContext oldcontext;
    1697              : 
    1698              :         /* create a function context for cross-call persistence */
    1699          217 :         funcctx = SRF_FIRSTCALL_INIT();
    1700              : 
    1701              :         /* switch to memory context appropriate for multiple function calls */
    1702          217 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1703              : 
    1704          217 :         publication = GetPublicationByName(pubname, false);
    1705              : 
    1706          217 :         if (publication->allsequences)
    1707            6 :             sequences = GetAllPublicationRelations(publication->oid,
    1708              :                                                    RELKIND_SEQUENCE,
    1709              :                                                    false);
    1710              : 
    1711          217 :         funcctx->user_fctx = sequences;
    1712              : 
    1713          217 :         MemoryContextSwitchTo(oldcontext);
    1714              :     }
    1715              : 
    1716              :     /* stuff done on every call of the function */
    1717          234 :     funcctx = SRF_PERCALL_SETUP();
    1718          234 :     sequences = (List *) funcctx->user_fctx;
    1719              : 
    1720          234 :     if (funcctx->call_cntr < list_length(sequences))
    1721              :     {
    1722           17 :         Oid         relid = list_nth_oid(sequences, funcctx->call_cntr);
    1723              : 
    1724           17 :         SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
    1725              :     }
    1726              : 
    1727          217 :     SRF_RETURN_DONE(funcctx);
    1728              : }
        

Generated by: LCOV version 2.0-1