LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 99.3 % 460 457
Test Date: 2026-02-28 14:14:49 Functions: 100.0 % 29 29
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_publication.c
       4              :  *      publication C API manipulation
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/catalog/pg_publication.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/genam.h"
      18              : #include "access/heapam.h"
      19              : #include "access/htup_details.h"
      20              : #include "access/tableam.h"
      21              : #include "catalog/catalog.h"
      22              : #include "catalog/dependency.h"
      23              : #include "catalog/indexing.h"
      24              : #include "catalog/namespace.h"
      25              : #include "catalog/objectaddress.h"
      26              : #include "catalog/partition.h"
      27              : #include "catalog/pg_inherits.h"
      28              : #include "catalog/pg_namespace.h"
      29              : #include "catalog/pg_publication.h"
      30              : #include "catalog/pg_publication_namespace.h"
      31              : #include "catalog/pg_publication_rel.h"
      32              : #include "catalog/pg_type.h"
      33              : #include "commands/publicationcmds.h"
      34              : #include "funcapi.h"
      35              : #include "utils/array.h"
      36              : #include "utils/builtins.h"
      37              : #include "utils/catcache.h"
      38              : #include "utils/fmgroids.h"
      39              : #include "utils/lsyscache.h"
      40              : #include "utils/rel.h"
      41              : #include "utils/syscache.h"
      42              : 
      43              : /* Records association between publication and published table */
      44              : typedef struct
      45              : {
      46              :     Oid         relid;          /* OID of published table */
      47              :     Oid         pubid;          /* OID of publication that publishes this
      48              :                                  * table. */
      49              : } published_rel;
      50              : 
      51              : /*
      52              :  * Check if relation can be in given publication and throws appropriate
      53              :  * error if not.
      54              :  */
      55              : static void
      56          566 : check_publication_add_relation(Relation targetrel)
      57              : {
      58              :     /* Must be a regular or partitioned table */
      59          566 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
      60           72 :         RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
      61            7 :         ereport(ERROR,
      62              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      63              :                  errmsg("cannot add relation \"%s\" to publication",
      64              :                         RelationGetRelationName(targetrel)),
      65              :                  errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
      66              : 
      67              :     /* Can't be system table */
      68          559 :     if (IsCatalogRelation(targetrel))
      69            3 :         ereport(ERROR,
      70              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      71              :                  errmsg("cannot add relation \"%s\" to publication",
      72              :                         RelationGetRelationName(targetrel)),
      73              :                  errdetail("This operation is not supported for system tables.")));
      74              : 
      75              :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      76          556 :     if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
      77            3 :         ereport(ERROR,
      78              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      79              :                  errmsg("cannot add relation \"%s\" to publication",
      80              :                         RelationGetRelationName(targetrel)),
      81              :                  errdetail("This operation is not supported for temporary tables.")));
      82          553 :     else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
      83            3 :         ereport(ERROR,
      84              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      85              :                  errmsg("cannot add relation \"%s\" to publication",
      86              :                         RelationGetRelationName(targetrel)),
      87              :                  errdetail("This operation is not supported for unlogged tables.")));
      88          550 : }
      89              : 
      90              : /*
      91              :  * Check if schema can be in given publication and throw appropriate error if
      92              :  * not.
      93              :  */
      94              : static void
      95          122 : check_publication_add_schema(Oid schemaid)
      96              : {
      97              :     /* Can't be system namespace */
      98          122 :     if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
      99            3 :         ereport(ERROR,
     100              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     101              :                  errmsg("cannot add schema \"%s\" to publication",
     102              :                         get_namespace_name(schemaid)),
     103              :                  errdetail("This operation is not supported for system schemas.")));
     104              : 
     105              :     /* Can't be temporary namespace */
     106          119 :     if (isAnyTempNamespace(schemaid))
     107            0 :         ereport(ERROR,
     108              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     109              :                  errmsg("cannot add schema \"%s\" to publication",
     110              :                         get_namespace_name(schemaid)),
     111              :                  errdetail("Temporary schemas cannot be replicated.")));
     112          119 : }
     113              : 
     114              : /*
     115              :  * Returns if relation represented by oid and Form_pg_class entry
     116              :  * is publishable.
     117              :  *
     118              :  * Does same checks as check_publication_add_relation() above except for
     119              :  * RELKIND_SEQUENCE, but does not need relation to be opened and also does
     120              :  * not throw errors. Here, the additional check is to support ALL SEQUENCES
     121              :  * publication.
     122              :  *
     123              :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
     124              :  * ie all tables created during initdb.  This mainly affects the preinstalled
     125              :  * information_schema.  IsCatalogRelationOid() only excludes tables with
     126              :  * relid < FirstUnpinnedObjectId, making that test rather redundant,
     127              :  * but really we should get rid of the FirstNormalObjectId test not
     128              :  * IsCatalogRelationOid.  We can't do so today because we don't want
     129              :  * information_schema tables to be considered publishable; but this test
     130              :  * is really inadequate for that, since the information_schema could be
     131              :  * dropped and reloaded and then it'll be considered publishable.  The best
     132              :  * long-term solution may be to add a "relispublishable" bool to pg_class,
     133              :  * and depend on that instead of OID checks.
     134              :  */
     135              : static bool
     136       300530 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     137              : {
     138       306732 :     return (reltuple->relkind == RELKIND_RELATION ||
     139         6202 :             reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
     140         5432 :             reltuple->relkind == RELKIND_SEQUENCE) &&
     141       296131 :         !IsCatalogRelationOid(relid) &&
     142       601060 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     143              :         relid >= FirstNormalObjectId;
     144              : }
     145              : 
     146              : /*
     147              :  * Another variant of is_publishable_class(), taking a Relation.
     148              :  */
     149              : bool
     150       271784 : is_publishable_relation(Relation rel)
     151              : {
     152       271784 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     153              : }
     154              : 
     155              : /*
     156              :  * SQL-callable variant of the above
     157              :  *
     158              :  * This returns null when the relation does not exist.  This is intended to be
     159              :  * used for example in psql to avoid gratuitous errors when there are
     160              :  * concurrent catalog changes.
     161              :  */
     162              : Datum
     163         3202 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     164              : {
     165         3202 :     Oid         relid = PG_GETARG_OID(0);
     166              :     HeapTuple   tuple;
     167              :     bool        result;
     168              : 
     169         3202 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     170         3202 :     if (!HeapTupleIsValid(tuple))
     171            0 :         PG_RETURN_NULL();
     172         3202 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     173         3202 :     ReleaseSysCache(tuple);
     174         3202 :     PG_RETURN_BOOL(result);
     175              : }
     176              : 
     177              : /*
     178              :  * Returns true if the ancestor is in the list of published relations.
     179              :  * Otherwise, returns false.
     180              :  */
     181              : static bool
     182          101 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
     183              : {
     184              :     ListCell   *lc;
     185              : 
     186          348 :     foreach(lc, table_infos)
     187              :     {
     188          287 :         Oid         relid = ((published_rel *) lfirst(lc))->relid;
     189              : 
     190          287 :         if (relid == ancestor)
     191           40 :             return true;
     192              :     }
     193              : 
     194           61 :     return false;
     195              : }
     196              : 
     197              : /*
     198              :  * Filter out the partitions whose parent tables are also present in the list.
     199              :  */
     200              : static void
     201          176 : filter_partitions(List *table_infos)
     202              : {
     203              :     ListCell   *lc;
     204              : 
     205          517 :     foreach(lc, table_infos)
     206              :     {
     207          341 :         bool        skip = false;
     208          341 :         List       *ancestors = NIL;
     209              :         ListCell   *lc2;
     210          341 :         published_rel *table_info = (published_rel *) lfirst(lc);
     211              : 
     212          341 :         if (get_rel_relispartition(table_info->relid))
     213          101 :             ancestors = get_partition_ancestors(table_info->relid);
     214              : 
     215          402 :         foreach(lc2, ancestors)
     216              :         {
     217          101 :             Oid         ancestor = lfirst_oid(lc2);
     218              : 
     219          101 :             if (is_ancestor_member_tableinfos(ancestor, table_infos))
     220              :             {
     221           40 :                 skip = true;
     222           40 :                 break;
     223              :             }
     224              :         }
     225              : 
     226          341 :         if (skip)
     227           40 :             table_infos = foreach_delete_current(table_infos, lc);
     228              :     }
     229          176 : }
     230              : 
     231              : /*
     232              :  * Returns true if any schema is associated with the publication, false if no
     233              :  * schema is associated with the publication.
     234              :  */
     235              : bool
     236          142 : is_schema_publication(Oid pubid)
     237              : {
     238              :     Relation    pubschsrel;
     239              :     ScanKeyData scankey;
     240              :     SysScanDesc scan;
     241              :     HeapTuple   tup;
     242          142 :     bool        result = false;
     243              : 
     244          142 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     245          142 :     ScanKeyInit(&scankey,
     246              :                 Anum_pg_publication_namespace_pnpubid,
     247              :                 BTEqualStrategyNumber, F_OIDEQ,
     248              :                 ObjectIdGetDatum(pubid));
     249              : 
     250          142 :     scan = systable_beginscan(pubschsrel,
     251              :                               PublicationNamespacePnnspidPnpubidIndexId,
     252              :                               true, NULL, 1, &scankey);
     253          142 :     tup = systable_getnext(scan);
     254          142 :     result = HeapTupleIsValid(tup);
     255              : 
     256          142 :     systable_endscan(scan);
     257          142 :     table_close(pubschsrel, AccessShareLock);
     258              : 
     259          142 :     return result;
     260              : }
     261              : 
     262              : /*
     263              :  * Returns true if the relation has column list associated with the
     264              :  * publication, false otherwise.
     265              :  *
     266              :  * If a column list is found, the corresponding bitmap is returned through the
     267              :  * cols parameter, if provided. The bitmap is constructed within the given
     268              :  * memory context (mcxt).
     269              :  */
     270              : bool
     271          800 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
     272              :                             Bitmapset **cols)
     273              : {
     274              :     HeapTuple   cftuple;
     275          800 :     bool        found = false;
     276              : 
     277          800 :     if (pub->alltables)
     278          195 :         return false;
     279              : 
     280          605 :     cftuple = SearchSysCache2(PUBLICATIONRELMAP,
     281              :                               ObjectIdGetDatum(relid),
     282              :                               ObjectIdGetDatum(pub->oid));
     283          605 :     if (HeapTupleIsValid(cftuple))
     284              :     {
     285              :         Datum       cfdatum;
     286              :         bool        isnull;
     287              : 
     288              :         /* Lookup the column list attribute. */
     289          554 :         cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
     290              :                                   Anum_pg_publication_rel_prattrs, &isnull);
     291              : 
     292              :         /* Was a column list found? */
     293          554 :         if (!isnull)
     294              :         {
     295              :             /* Build the column list bitmap in the given memory context. */
     296          157 :             if (cols)
     297          154 :                 *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
     298              : 
     299          157 :             found = true;
     300              :         }
     301              : 
     302          554 :         ReleaseSysCache(cftuple);
     303              :     }
     304              : 
     305          605 :     return found;
     306              : }
     307              : 
     308              : /*
     309              :  * Gets the relations based on the publication partition option for a specified
     310              :  * relation.
     311              :  */
     312              : List *
     313         2932 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
     314              :                                Oid relid)
     315              : {
     316         2932 :     if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
     317              :         pub_partopt != PUBLICATION_PART_ROOT)
     318          594 :     {
     319          594 :         List       *all_parts = find_all_inheritors(relid, NoLock,
     320              :                                                     NULL);
     321              : 
     322          594 :         if (pub_partopt == PUBLICATION_PART_ALL)
     323          493 :             result = list_concat(result, all_parts);
     324          101 :         else if (pub_partopt == PUBLICATION_PART_LEAF)
     325              :         {
     326              :             ListCell   *lc;
     327              : 
     328          369 :             foreach(lc, all_parts)
     329              :             {
     330          268 :                 Oid         partOid = lfirst_oid(lc);
     331              : 
     332          268 :                 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
     333          167 :                     result = lappend_oid(result, partOid);
     334              :             }
     335              :         }
     336              :         else
     337              :             Assert(false);
     338              :     }
     339              :     else
     340         2338 :         result = lappend_oid(result, relid);
     341              : 
     342         2932 :     return result;
     343              : }
     344              : 
     345              : /*
     346              :  * Returns the relid of the topmost ancestor that is published via this
     347              :  * publication if any and set its ancestor level to ancestor_level,
     348              :  * otherwise returns InvalidOid.
     349              :  *
     350              :  * The ancestor_level value allows us to compare the results for multiple
     351              :  * publications, and decide which value is higher up.
     352              :  *
     353              :  * Note that the list of ancestors should be ordered such that the topmost
     354              :  * ancestor is at the end of the list.
     355              :  */
     356              : Oid
     357          255 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
     358              : {
     359              :     ListCell   *lc;
     360          255 :     Oid         topmost_relid = InvalidOid;
     361          255 :     int         level = 0;
     362              : 
     363              :     /*
     364              :      * Find the "topmost" ancestor that is in this publication.
     365              :      */
     366          518 :     foreach(lc, ancestors)
     367              :     {
     368          263 :         Oid         ancestor = lfirst_oid(lc);
     369          263 :         List       *apubids = GetRelationPublications(ancestor);
     370          263 :         List       *aschemaPubids = NIL;
     371              : 
     372          263 :         level++;
     373              : 
     374          263 :         if (list_member_oid(apubids, puboid))
     375              :         {
     376          156 :             topmost_relid = ancestor;
     377              : 
     378          156 :             if (ancestor_level)
     379           43 :                 *ancestor_level = level;
     380              :         }
     381              :         else
     382              :         {
     383          107 :             aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
     384          107 :             if (list_member_oid(aschemaPubids, puboid))
     385              :             {
     386            5 :                 topmost_relid = ancestor;
     387              : 
     388            5 :                 if (ancestor_level)
     389            5 :                     *ancestor_level = level;
     390              :             }
     391              :         }
     392              : 
     393          263 :         list_free(apubids);
     394          263 :         list_free(aschemaPubids);
     395              :     }
     396              : 
     397          255 :     return topmost_relid;
     398              : }
     399              : 
     400              : /*
     401              :  * attnumstoint2vector
     402              :  *      Convert a Bitmapset of AttrNumbers into an int2vector.
     403              :  *
     404              :  * AttrNumber numbers are 0-based, i.e., not offset by
     405              :  * FirstLowInvalidHeapAttributeNumber.
     406              :  */
     407              : static int2vector *
     408          166 : attnumstoint2vector(Bitmapset *attrs)
     409              : {
     410              :     int2vector *result;
     411          166 :     int         n = bms_num_members(attrs);
     412          166 :     int         i = -1;
     413          166 :     int         j = 0;
     414              : 
     415          166 :     result = buildint2vector(NULL, n);
     416              : 
     417          452 :     while ((i = bms_next_member(attrs, i)) >= 0)
     418              :     {
     419              :         Assert(i <= PG_INT16_MAX);
     420              : 
     421          286 :         result->values[j++] = (int16) i;
     422              :     }
     423              : 
     424          166 :     return result;
     425              : }
     426              : 
     427              : /*
     428              :  * Insert new publication / relation mapping.
     429              :  */
     430              : ObjectAddress
     431          577 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
     432              :                          bool if_not_exists)
     433              : {
     434              :     Relation    rel;
     435              :     HeapTuple   tup;
     436              :     Datum       values[Natts_pg_publication_rel];
     437              :     bool        nulls[Natts_pg_publication_rel];
     438          577 :     Relation    targetrel = pri->relation;
     439          577 :     Oid         relid = RelationGetRelid(targetrel);
     440              :     Oid         pubreloid;
     441              :     Bitmapset  *attnums;
     442          577 :     Publication *pub = GetPublication(pubid);
     443              :     ObjectAddress myself,
     444              :                 referenced;
     445          577 :     List       *relids = NIL;
     446              :     int         i;
     447              : 
     448          577 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     449              : 
     450              :     /*
     451              :      * Check for duplicates. Note that this does not really prevent
     452              :      * duplicates, it's here just to provide nicer error message in common
     453              :      * case. The real protection is the unique key on the catalog.
     454              :      */
     455          577 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     456              :                               ObjectIdGetDatum(pubid)))
     457              :     {
     458           11 :         table_close(rel, RowExclusiveLock);
     459              : 
     460           11 :         if (if_not_exists)
     461            8 :             return InvalidObjectAddress;
     462              : 
     463            3 :         ereport(ERROR,
     464              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     465              :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     466              :                         RelationGetRelationName(targetrel), pub->name)));
     467              :     }
     468              : 
     469          566 :     check_publication_add_relation(targetrel);
     470              : 
     471              :     /* Validate and translate column names into a Bitmapset of attnums. */
     472          550 :     attnums = pub_collist_validate(pri->relation, pri->columns);
     473              : 
     474              :     /* Form a tuple. */
     475          538 :     memset(values, 0, sizeof(values));
     476          538 :     memset(nulls, false, sizeof(nulls));
     477              : 
     478          538 :     pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     479              :                                    Anum_pg_publication_rel_oid);
     480          538 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
     481          538 :     values[Anum_pg_publication_rel_prpubid - 1] =
     482          538 :         ObjectIdGetDatum(pubid);
     483          538 :     values[Anum_pg_publication_rel_prrelid - 1] =
     484          538 :         ObjectIdGetDatum(relid);
     485              : 
     486              :     /* Add qualifications, if available */
     487          538 :     if (pri->whereClause != NULL)
     488          165 :         values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
     489              :     else
     490          373 :         nulls[Anum_pg_publication_rel_prqual - 1] = true;
     491              : 
     492              :     /* Add column list, if available */
     493          538 :     if (pri->columns)
     494          166 :         values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
     495              :     else
     496          372 :         nulls[Anum_pg_publication_rel_prattrs - 1] = true;
     497              : 
     498          538 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     499              : 
     500              :     /* Insert tuple into catalog. */
     501          538 :     CatalogTupleInsert(rel, tup);
     502          538 :     heap_freetuple(tup);
     503              : 
     504              :     /* Register dependencies as needed */
     505          538 :     ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
     506              : 
     507              :     /* Add dependency on the publication */
     508          538 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     509          538 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     510              : 
     511              :     /* Add dependency on the relation */
     512          538 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     513          538 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     514              : 
     515              :     /* Add dependency on the objects mentioned in the qualifications */
     516          538 :     if (pri->whereClause)
     517          165 :         recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
     518              :                                         DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
     519              :                                         false);
     520              : 
     521              :     /* Add dependency on the columns, if any are listed */
     522          538 :     i = -1;
     523          824 :     while ((i = bms_next_member(attnums, i)) >= 0)
     524              :     {
     525          286 :         ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
     526          286 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     527              :     }
     528              : 
     529              :     /* Close the table. */
     530          538 :     table_close(rel, RowExclusiveLock);
     531              : 
     532              :     /*
     533              :      * Invalidate relcache so that publication info is rebuilt.
     534              :      *
     535              :      * For the partitioned tables, we must invalidate all partitions contained
     536              :      * in the respective partition hierarchies, not just the one explicitly
     537              :      * mentioned in the publication. This is required because we implicitly
     538              :      * publish the child tables when the parent table is published.
     539              :      */
     540          538 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
     541              :                                             relid);
     542              : 
     543          538 :     InvalidatePublicationRels(relids);
     544              : 
     545          538 :     return myself;
     546              : }
     547              : 
     548              : /*
     549              :  * pub_collist_validate
     550              :  *      Process and validate the 'columns' list and ensure the columns are all
     551              :  *      valid to use for a publication.  Checks for and raises an ERROR for
     552              :  *      any unknown columns, system columns, duplicate columns, or virtual
     553              :  *      generated columns.
     554              :  *
     555              :  * Looks up each column's attnum and returns a 0-based Bitmapset of the
     556              :  * corresponding attnums.
     557              :  */
     558              : Bitmapset *
     559          752 : pub_collist_validate(Relation targetrel, List *columns)
     560              : {
     561          752 :     Bitmapset  *set = NULL;
     562              :     ListCell   *lc;
     563          752 :     TupleDesc   tupdesc = RelationGetDescr(targetrel);
     564              : 
     565         1174 :     foreach(lc, columns)
     566              :     {
     567          440 :         char       *colname = strVal(lfirst(lc));
     568          440 :         AttrNumber  attnum = get_attnum(RelationGetRelid(targetrel), colname);
     569              : 
     570          440 :         if (attnum == InvalidAttrNumber)
     571            3 :             ereport(ERROR,
     572              :                     errcode(ERRCODE_UNDEFINED_COLUMN),
     573              :                     errmsg("column \"%s\" of relation \"%s\" does not exist",
     574              :                            colname, RelationGetRelationName(targetrel)));
     575              : 
     576          437 :         if (!AttrNumberIsForUserDefinedAttr(attnum))
     577            6 :             ereport(ERROR,
     578              :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     579              :                     errmsg("cannot use system column \"%s\" in publication column list",
     580              :                            colname));
     581              : 
     582          431 :         if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     583            3 :             ereport(ERROR,
     584              :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     585              :                     errmsg("cannot use virtual generated column \"%s\" in publication column list",
     586              :                            colname));
     587              : 
     588          428 :         if (bms_is_member(attnum, set))
     589            6 :             ereport(ERROR,
     590              :                     errcode(ERRCODE_DUPLICATE_OBJECT),
     591              :                     errmsg("duplicate column \"%s\" in publication column list",
     592              :                            colname));
     593              : 
     594          422 :         set = bms_add_member(set, attnum);
     595              :     }
     596              : 
     597          734 :     return set;
     598              : }
     599              : 
     600              : /*
     601              :  * Transform a column list (represented by an array Datum) to a bitmapset.
     602              :  *
     603              :  * If columns isn't NULL, add the column numbers to that set.
     604              :  *
     605              :  * If mcxt isn't NULL, build the bitmapset in that context.
     606              :  */
     607              : Bitmapset *
     608          222 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
     609              : {
     610          222 :     Bitmapset  *result = columns;
     611              :     ArrayType  *arr;
     612              :     int         nelems;
     613              :     int16      *elems;
     614          222 :     MemoryContext oldcxt = NULL;
     615              : 
     616          222 :     arr = DatumGetArrayTypeP(pubcols);
     617          222 :     nelems = ARR_DIMS(arr)[0];
     618          222 :     elems = (int16 *) ARR_DATA_PTR(arr);
     619              : 
     620              :     /* If a memory context was specified, switch to it. */
     621          222 :     if (mcxt)
     622           39 :         oldcxt = MemoryContextSwitchTo(mcxt);
     623              : 
     624          613 :     for (int i = 0; i < nelems; i++)
     625          391 :         result = bms_add_member(result, elems[i]);
     626              : 
     627          222 :     if (mcxt)
     628           39 :         MemoryContextSwitchTo(oldcxt);
     629              : 
     630          222 :     return result;
     631              : }
     632              : 
     633              : /*
     634              :  * Returns a bitmap representing the columns of the specified table.
     635              :  *
     636              :  * Generated columns are included if include_gencols_type is
     637              :  * PUBLISH_GENCOLS_STORED.
     638              :  */
     639              : Bitmapset *
     640            9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
     641              : {
     642            9 :     Bitmapset  *result = NULL;
     643            9 :     TupleDesc   desc = RelationGetDescr(relation);
     644              : 
     645           30 :     for (int i = 0; i < desc->natts; i++)
     646              :     {
     647           21 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     648              : 
     649           21 :         if (att->attisdropped)
     650            1 :             continue;
     651              : 
     652           20 :         if (att->attgenerated)
     653              :         {
     654              :             /* We only support replication of STORED generated cols. */
     655            2 :             if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
     656            1 :                 continue;
     657              : 
     658              :             /* User hasn't requested to replicate STORED generated cols. */
     659            1 :             if (include_gencols_type != PUBLISH_GENCOLS_STORED)
     660            1 :                 continue;
     661              :         }
     662              : 
     663           18 :         result = bms_add_member(result, att->attnum);
     664              :     }
     665              : 
     666            9 :     return result;
     667              : }
     668              : 
     669              : /*
     670              :  * Insert new publication / schema mapping.
     671              :  */
     672              : ObjectAddress
     673          131 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
     674              : {
     675              :     Relation    rel;
     676              :     HeapTuple   tup;
     677              :     Datum       values[Natts_pg_publication_namespace];
     678              :     bool        nulls[Natts_pg_publication_namespace];
     679              :     Oid         psschid;
     680          131 :     Publication *pub = GetPublication(pubid);
     681          131 :     List       *schemaRels = NIL;
     682              :     ObjectAddress myself,
     683              :                 referenced;
     684              : 
     685          131 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
     686              : 
     687              :     /*
     688              :      * Check for duplicates. Note that this does not really prevent
     689              :      * duplicates, it's here just to provide nicer error message in common
     690              :      * case. The real protection is the unique key on the catalog.
     691              :      */
     692          131 :     if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     693              :                               ObjectIdGetDatum(schemaid),
     694              :                               ObjectIdGetDatum(pubid)))
     695              :     {
     696            9 :         table_close(rel, RowExclusiveLock);
     697              : 
     698            9 :         if (if_not_exists)
     699            6 :             return InvalidObjectAddress;
     700              : 
     701            3 :         ereport(ERROR,
     702              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     703              :                  errmsg("schema \"%s\" is already member of publication \"%s\"",
     704              :                         get_namespace_name(schemaid), pub->name)));
     705              :     }
     706              : 
     707          122 :     check_publication_add_schema(schemaid);
     708              : 
     709              :     /* Form a tuple */
     710          119 :     memset(values, 0, sizeof(values));
     711          119 :     memset(nulls, false, sizeof(nulls));
     712              : 
     713          119 :     psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
     714              :                                  Anum_pg_publication_namespace_oid);
     715          119 :     values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
     716          119 :     values[Anum_pg_publication_namespace_pnpubid - 1] =
     717          119 :         ObjectIdGetDatum(pubid);
     718          119 :     values[Anum_pg_publication_namespace_pnnspid - 1] =
     719          119 :         ObjectIdGetDatum(schemaid);
     720              : 
     721          119 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     722              : 
     723              :     /* Insert tuple into catalog */
     724          119 :     CatalogTupleInsert(rel, tup);
     725          119 :     heap_freetuple(tup);
     726              : 
     727          119 :     ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
     728              : 
     729              :     /* Add dependency on the publication */
     730          119 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     731          119 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     732              : 
     733              :     /* Add dependency on the schema */
     734          119 :     ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
     735          119 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     736              : 
     737              :     /* Close the table */
     738          119 :     table_close(rel, RowExclusiveLock);
     739              : 
     740              :     /*
     741              :      * Invalidate relcache so that publication info is rebuilt. See
     742              :      * publication_add_relation for why we need to consider all the
     743              :      * partitions.
     744              :      */
     745          119 :     schemaRels = GetSchemaPublicationRelations(schemaid,
     746              :                                                PUBLICATION_PART_ALL);
     747          119 :     InvalidatePublicationRels(schemaRels);
     748              : 
     749          119 :     return myself;
     750              : }
     751              : 
     752              : /* Gets list of publication oids for a relation */
     753              : List *
     754         6837 : GetRelationPublications(Oid relid)
     755              : {
     756         6837 :     List       *result = NIL;
     757              :     CatCList   *pubrellist;
     758              :     int         i;
     759              : 
     760              :     /* Find all publications associated with the relation. */
     761         6837 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     762              :                                      ObjectIdGetDatum(relid));
     763         7667 :     for (i = 0; i < pubrellist->n_members; i++)
     764              :     {
     765          830 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     766          830 :         Oid         pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
     767              : 
     768          830 :         result = lappend_oid(result, pubid);
     769              :     }
     770              : 
     771         6837 :     ReleaseSysCacheList(pubrellist);
     772              : 
     773         6837 :     return result;
     774              : }
     775              : 
     776              : /*
     777              :  * Gets list of relation oids for a publication.
     778              :  *
     779              :  * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
     780              :  * should use GetAllPublicationRelations().
     781              :  */
     782              : List *
     783         1202 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     784              : {
     785              :     List       *result;
     786              :     Relation    pubrelsrel;
     787              :     ScanKeyData scankey;
     788              :     SysScanDesc scan;
     789              :     HeapTuple   tup;
     790              : 
     791              :     /* Find all relations associated with the publication. */
     792         1202 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     793              : 
     794         1202 :     ScanKeyInit(&scankey,
     795              :                 Anum_pg_publication_rel_prpubid,
     796              :                 BTEqualStrategyNumber, F_OIDEQ,
     797              :                 ObjectIdGetDatum(pubid));
     798              : 
     799         1202 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
     800              :                               true, NULL, 1, &scankey);
     801              : 
     802         1202 :     result = NIL;
     803         2763 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     804              :     {
     805              :         Form_pg_publication_rel pubrel;
     806              : 
     807         1561 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     808         1561 :         result = GetPubPartitionOptionRelations(result, pub_partopt,
     809              :                                                 pubrel->prrelid);
     810              :     }
     811              : 
     812         1202 :     systable_endscan(scan);
     813         1202 :     table_close(pubrelsrel, AccessShareLock);
     814              : 
     815              :     /* Now sort and de-duplicate the result list */
     816         1202 :     list_sort(result, list_oid_cmp);
     817         1202 :     list_deduplicate_oid(result);
     818              : 
     819         1202 :     return result;
     820              : }
     821              : 
     822              : /*
     823              :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     824              :  */
     825              : List *
     826         4626 : GetAllTablesPublications(void)
     827              : {
     828              :     List       *result;
     829              :     Relation    rel;
     830              :     ScanKeyData scankey;
     831              :     SysScanDesc scan;
     832              :     HeapTuple   tup;
     833              : 
     834              :     /* Find all publications that are marked as for all tables. */
     835         4626 :     rel = table_open(PublicationRelationId, AccessShareLock);
     836              : 
     837         4626 :     ScanKeyInit(&scankey,
     838              :                 Anum_pg_publication_puballtables,
     839              :                 BTEqualStrategyNumber, F_BOOLEQ,
     840              :                 BoolGetDatum(true));
     841              : 
     842         4626 :     scan = systable_beginscan(rel, InvalidOid, false,
     843              :                               NULL, 1, &scankey);
     844              : 
     845         4626 :     result = NIL;
     846         4734 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     847              :     {
     848          108 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
     849              : 
     850          108 :         result = lappend_oid(result, oid);
     851              :     }
     852              : 
     853         4626 :     systable_endscan(scan);
     854         4626 :     table_close(rel, AccessShareLock);
     855              : 
     856         4626 :     return result;
     857              : }
     858              : 
     859              : /*
     860              :  * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
     861              :  * publication(s).
     862              :  *
     863              :  * If the publication publishes partition changes via their respective root
     864              :  * partitioned tables, we must exclude partitions in favor of including the
     865              :  * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
     866              :  * publication.
     867              :  */
     868              : List *
     869          182 : GetAllPublicationRelations(char relkind, bool pubviaroot)
     870              : {
     871              :     Relation    classRel;
     872              :     ScanKeyData key[1];
     873              :     TableScanDesc scan;
     874              :     HeapTuple   tuple;
     875          182 :     List       *result = NIL;
     876              : 
     877              :     Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
     878              : 
     879          182 :     classRel = table_open(RelationRelationId, AccessShareLock);
     880              : 
     881          182 :     ScanKeyInit(&key[0],
     882              :                 Anum_pg_class_relkind,
     883              :                 BTEqualStrategyNumber, F_CHAREQ,
     884              :                 CharGetDatum(relkind));
     885              : 
     886          182 :     scan = table_beginscan_catalog(classRel, 1, key);
     887              : 
     888        12975 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     889              :     {
     890        12793 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     891        12793 :         Oid         relid = relForm->oid;
     892              : 
     893        12793 :         if (is_publishable_class(relid, relForm) &&
     894          819 :             !(relForm->relispartition && pubviaroot))
     895          728 :             result = lappend_oid(result, relid);
     896              :     }
     897              : 
     898          182 :     table_endscan(scan);
     899              : 
     900          182 :     if (pubviaroot)
     901              :     {
     902           13 :         ScanKeyInit(&key[0],
     903              :                     Anum_pg_class_relkind,
     904              :                     BTEqualStrategyNumber, F_CHAREQ,
     905              :                     CharGetDatum(RELKIND_PARTITIONED_TABLE));
     906              : 
     907           13 :         scan = table_beginscan_catalog(classRel, 1, key);
     908              : 
     909           78 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     910              :         {
     911           65 :             Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     912           65 :             Oid         relid = relForm->oid;
     913              : 
     914           65 :             if (is_publishable_class(relid, relForm) &&
     915           65 :                 !relForm->relispartition)
     916           52 :                 result = lappend_oid(result, relid);
     917              :         }
     918              : 
     919           13 :         table_endscan(scan);
     920              :     }
     921              : 
     922          182 :     table_close(classRel, AccessShareLock);
     923          182 :     return result;
     924              : }
     925              : 
     926              : /*
     927              :  * Gets the list of schema oids for a publication.
     928              :  *
     929              :  * This should only be used FOR TABLES IN SCHEMA publications.
     930              :  */
     931              : List *
     932         1156 : GetPublicationSchemas(Oid pubid)
     933              : {
     934         1156 :     List       *result = NIL;
     935              :     Relation    pubschsrel;
     936              :     ScanKeyData scankey;
     937              :     SysScanDesc scan;
     938              :     HeapTuple   tup;
     939              : 
     940              :     /* Find all schemas associated with the publication */
     941         1156 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     942              : 
     943         1156 :     ScanKeyInit(&scankey,
     944              :                 Anum_pg_publication_namespace_pnpubid,
     945              :                 BTEqualStrategyNumber, F_OIDEQ,
     946              :                 ObjectIdGetDatum(pubid));
     947              : 
     948         1156 :     scan = systable_beginscan(pubschsrel,
     949              :                               PublicationNamespacePnnspidPnpubidIndexId,
     950              :                               true, NULL, 1, &scankey);
     951         1206 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     952              :     {
     953              :         Form_pg_publication_namespace pubsch;
     954              : 
     955           50 :         pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
     956              : 
     957           50 :         result = lappend_oid(result, pubsch->pnnspid);
     958              :     }
     959              : 
     960         1156 :     systable_endscan(scan);
     961         1156 :     table_close(pubschsrel, AccessShareLock);
     962              : 
     963         1156 :     return result;
     964              : }
     965              : 
     966              : /*
     967              :  * Gets the list of publication oids associated with a specified schema.
     968              :  */
     969              : List *
     970         6599 : GetSchemaPublications(Oid schemaid)
     971              : {
     972         6599 :     List       *result = NIL;
     973              :     CatCList   *pubschlist;
     974              :     int         i;
     975              : 
     976              :     /* Find all publications associated with the schema */
     977         6599 :     pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
     978              :                                      ObjectIdGetDatum(schemaid));
     979         6656 :     for (i = 0; i < pubschlist->n_members; i++)
     980              :     {
     981           57 :         HeapTuple   tup = &pubschlist->members[i]->tuple;
     982           57 :         Oid         pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
     983              : 
     984           57 :         result = lappend_oid(result, pubid);
     985              :     }
     986              : 
     987         6599 :     ReleaseSysCacheList(pubschlist);
     988              : 
     989         6599 :     return result;
     990              : }
     991              : 
     992              : /*
     993              :  * Get the list of publishable relation oids for a specified schema.
     994              :  */
     995              : List *
     996          250 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
     997              : {
     998              :     Relation    classRel;
     999              :     ScanKeyData key[1];
    1000              :     TableScanDesc scan;
    1001              :     HeapTuple   tuple;
    1002          250 :     List       *result = NIL;
    1003              : 
    1004              :     Assert(OidIsValid(schemaid));
    1005              : 
    1006          250 :     classRel = table_open(RelationRelationId, AccessShareLock);
    1007              : 
    1008          250 :     ScanKeyInit(&key[0],
    1009              :                 Anum_pg_class_relnamespace,
    1010              :                 BTEqualStrategyNumber, F_OIDEQ,
    1011              :                 ObjectIdGetDatum(schemaid));
    1012              : 
    1013              :     /* get all the relations present in the specified schema */
    1014          250 :     scan = table_beginscan_catalog(classRel, 1, key);
    1015        12936 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    1016              :     {
    1017        12686 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
    1018        12686 :         Oid         relid = relForm->oid;
    1019              :         char        relkind;
    1020              : 
    1021        12686 :         if (!is_publishable_class(relid, relForm))
    1022         4351 :             continue;
    1023              : 
    1024         8335 :         relkind = get_rel_relkind(relid);
    1025         8335 :         if (relkind == RELKIND_RELATION)
    1026         7150 :             result = lappend_oid(result, relid);
    1027         1185 :         else if (relkind == RELKIND_PARTITIONED_TABLE)
    1028              :         {
    1029          391 :             List       *partitionrels = NIL;
    1030              : 
    1031              :             /*
    1032              :              * It is quite possible that some of the partitions are in a
    1033              :              * different schema than the parent table, so we need to get such
    1034              :              * partitions separately.
    1035              :              */
    1036          391 :             partitionrels = GetPubPartitionOptionRelations(partitionrels,
    1037              :                                                            pub_partopt,
    1038              :                                                            relForm->oid);
    1039          391 :             result = list_concat_unique_oid(result, partitionrels);
    1040              :         }
    1041              :     }
    1042              : 
    1043          250 :     table_endscan(scan);
    1044          250 :     table_close(classRel, AccessShareLock);
    1045          250 :     return result;
    1046              : }
    1047              : 
    1048              : /*
    1049              :  * Gets the list of all relations published by FOR TABLES IN SCHEMA
    1050              :  * publication.
    1051              :  */
    1052              : List *
    1053          954 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
    1054              : {
    1055          954 :     List       *result = NIL;
    1056          954 :     List       *pubschemalist = GetPublicationSchemas(pubid);
    1057              :     ListCell   *cell;
    1058              : 
    1059          989 :     foreach(cell, pubschemalist)
    1060              :     {
    1061           35 :         Oid         schemaid = lfirst_oid(cell);
    1062           35 :         List       *schemaRels = NIL;
    1063              : 
    1064           35 :         schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
    1065           35 :         result = list_concat(result, schemaRels);
    1066              :     }
    1067              : 
    1068          954 :     return result;
    1069              : }
    1070              : 
    1071              : /*
    1072              :  * Get publication using oid
    1073              :  *
    1074              :  * The Publication struct and its data are palloc'ed here.
    1075              :  */
    1076              : Publication *
    1077         4810 : GetPublication(Oid pubid)
    1078              : {
    1079              :     HeapTuple   tup;
    1080              :     Publication *pub;
    1081              :     Form_pg_publication pubform;
    1082              : 
    1083         4810 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1084         4810 :     if (!HeapTupleIsValid(tup))
    1085            0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1086              : 
    1087         4810 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1088              : 
    1089         4810 :     pub = palloc_object(Publication);
    1090         4810 :     pub->oid = pubid;
    1091         4810 :     pub->name = pstrdup(NameStr(pubform->pubname));
    1092         4810 :     pub->alltables = pubform->puballtables;
    1093         4810 :     pub->allsequences = pubform->puballsequences;
    1094         4810 :     pub->pubactions.pubinsert = pubform->pubinsert;
    1095         4810 :     pub->pubactions.pubupdate = pubform->pubupdate;
    1096         4810 :     pub->pubactions.pubdelete = pubform->pubdelete;
    1097         4810 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
    1098         4810 :     pub->pubviaroot = pubform->pubviaroot;
    1099         4810 :     pub->pubgencols_type = pubform->pubgencols;
    1100              : 
    1101         4810 :     ReleaseSysCache(tup);
    1102              : 
    1103         4810 :     return pub;
    1104              : }
    1105              : 
    1106              : /*
    1107              :  * Get Publication using name.
    1108              :  */
    1109              : Publication *
    1110         1539 : GetPublicationByName(const char *pubname, bool missing_ok)
    1111              : {
    1112              :     Oid         oid;
    1113              : 
    1114         1539 :     oid = get_publication_oid(pubname, missing_ok);
    1115              : 
    1116         1538 :     return OidIsValid(oid) ? GetPublication(oid) : NULL;
    1117              : }
    1118              : 
    1119              : /*
    1120              :  * Get information of the tables in the given publication array.
    1121              :  *
    1122              :  * Returns pubid, relid, column list, row filter for each table.
    1123              :  */
    1124              : Datum
    1125         3115 : pg_get_publication_tables(PG_FUNCTION_ARGS)
    1126              : {
    1127              : #define NUM_PUBLICATION_TABLES_ELEM 4
    1128              :     FuncCallContext *funcctx;
    1129         3115 :     List       *table_infos = NIL;
    1130              : 
    1131              :     /* stuff done only on the first call of the function */
    1132         3115 :     if (SRF_IS_FIRSTCALL())
    1133              :     {
    1134              :         TupleDesc   tupdesc;
    1135              :         MemoryContext oldcontext;
    1136              :         ArrayType  *arr;
    1137              :         Datum      *elems;
    1138              :         int         nelems,
    1139              :                     i;
    1140         1021 :         bool        viaroot = false;
    1141              : 
    1142              :         /* create a function context for cross-call persistence */
    1143         1021 :         funcctx = SRF_FIRSTCALL_INIT();
    1144              : 
    1145              :         /* switch to memory context appropriate for multiple function calls */
    1146         1021 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1147              : 
    1148              :         /*
    1149              :          * Deconstruct the parameter into elements where each element is a
    1150              :          * publication name.
    1151              :          */
    1152         1021 :         arr = PG_GETARG_ARRAYTYPE_P(0);
    1153         1021 :         deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
    1154              : 
    1155              :         /* Get Oids of tables from each publication. */
    1156         2088 :         for (i = 0; i < nelems; i++)
    1157              :         {
    1158              :             Publication *pub_elem;
    1159         1068 :             List       *pub_elem_tables = NIL;
    1160              :             ListCell   *lc;
    1161              : 
    1162         1068 :             pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
    1163              : 
    1164              :             /*
    1165              :              * Publications support partitioned tables. If
    1166              :              * publish_via_partition_root is false, all changes are replicated
    1167              :              * using leaf partition identity and schema, so we only need
    1168              :              * those. Otherwise, get the partitioned table itself.
    1169              :              */
    1170         1067 :             if (pub_elem->alltables)
    1171          176 :                 pub_elem_tables = GetAllPublicationRelations(RELKIND_RELATION,
    1172          176 :                                                              pub_elem->pubviaroot);
    1173              :             else
    1174              :             {
    1175              :                 List       *relids,
    1176              :                            *schemarelids;
    1177              : 
    1178          891 :                 relids = GetPublicationRelations(pub_elem->oid,
    1179          891 :                                                  pub_elem->pubviaroot ?
    1180          891 :                                                  PUBLICATION_PART_ROOT :
    1181              :                                                  PUBLICATION_PART_LEAF);
    1182          891 :                 schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
    1183          891 :                                                                 pub_elem->pubviaroot ?
    1184          891 :                                                                 PUBLICATION_PART_ROOT :
    1185              :                                                                 PUBLICATION_PART_LEAF);
    1186          891 :                 pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
    1187              :             }
    1188              : 
    1189              :             /*
    1190              :              * Record the published table and the corresponding publication so
    1191              :              * that we can get row filters and column lists later.
    1192              :              *
    1193              :              * When a table is published by multiple publications, to obtain
    1194              :              * all row filters and column lists, the structure related to this
    1195              :              * table will be recorded multiple times.
    1196              :              */
    1197         3201 :             foreach(lc, pub_elem_tables)
    1198              :             {
    1199         2134 :                 published_rel *table_info = palloc_object(published_rel);
    1200              : 
    1201         2134 :                 table_info->relid = lfirst_oid(lc);
    1202         2134 :                 table_info->pubid = pub_elem->oid;
    1203         2134 :                 table_infos = lappend(table_infos, table_info);
    1204              :             }
    1205              : 
    1206              :             /* At least one publication is using publish_via_partition_root. */
    1207         1067 :             if (pub_elem->pubviaroot)
    1208          184 :                 viaroot = true;
    1209              :         }
    1210              : 
    1211              :         /*
    1212              :          * If the publication publishes partition changes via their respective
    1213              :          * root partitioned tables, we must exclude partitions in favor of
    1214              :          * including the root partitioned tables. Otherwise, the function
    1215              :          * could return both the child and parent tables which could cause
    1216              :          * data of the child table to be double-published on the subscriber
    1217              :          * side.
    1218              :          */
    1219         1020 :         if (viaroot)
    1220          176 :             filter_partitions(table_infos);
    1221              : 
    1222              :         /* Construct a tuple descriptor for the result rows. */
    1223         1020 :         tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
    1224         1020 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
    1225              :                            OIDOID, -1, 0);
    1226         1020 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
    1227              :                            OIDOID, -1, 0);
    1228         1020 :         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
    1229              :                            INT2VECTOROID, -1, 0);
    1230         1020 :         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
    1231              :                            PG_NODE_TREEOID, -1, 0);
    1232              : 
    1233         1020 :         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
    1234         1020 :         funcctx->user_fctx = table_infos;
    1235              : 
    1236         1020 :         MemoryContextSwitchTo(oldcontext);
    1237              :     }
    1238              : 
    1239              :     /* stuff done on every call of the function */
    1240         3114 :     funcctx = SRF_PERCALL_SETUP();
    1241         3114 :     table_infos = (List *) funcctx->user_fctx;
    1242              : 
    1243         3114 :     if (funcctx->call_cntr < list_length(table_infos))
    1244              :     {
    1245         2094 :         HeapTuple   pubtuple = NULL;
    1246              :         HeapTuple   rettuple;
    1247              :         Publication *pub;
    1248         2094 :         published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
    1249         2094 :         Oid         relid = table_info->relid;
    1250         2094 :         Oid         schemaid = get_rel_namespace(relid);
    1251         2094 :         Datum       values[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1252         2094 :         bool        nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1253              : 
    1254              :         /*
    1255              :          * Form tuple with appropriate data.
    1256              :          */
    1257              : 
    1258         2094 :         pub = GetPublication(table_info->pubid);
    1259              : 
    1260         2094 :         values[0] = ObjectIdGetDatum(pub->oid);
    1261         2094 :         values[1] = ObjectIdGetDatum(relid);
    1262              : 
    1263              :         /*
    1264              :          * We don't consider row filters or column lists for FOR ALL TABLES or
    1265              :          * FOR TABLES IN SCHEMA publications.
    1266              :          */
    1267         2094 :         if (!pub->alltables &&
    1268         1331 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
    1269              :                                    ObjectIdGetDatum(schemaid),
    1270              :                                    ObjectIdGetDatum(pub->oid)))
    1271         1284 :             pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
    1272              :                                            ObjectIdGetDatum(relid),
    1273              :                                            ObjectIdGetDatum(pub->oid));
    1274              : 
    1275         2094 :         if (HeapTupleIsValid(pubtuple))
    1276              :         {
    1277              :             /* Lookup the column list attribute. */
    1278         1171 :             values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1279              :                                         Anum_pg_publication_rel_prattrs,
    1280              :                                         &(nulls[2]));
    1281              : 
    1282              :             /* Null indicates no filter. */
    1283         1171 :             values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1284              :                                         Anum_pg_publication_rel_prqual,
    1285              :                                         &(nulls[3]));
    1286              :         }
    1287              :         else
    1288              :         {
    1289          923 :             nulls[2] = true;
    1290          923 :             nulls[3] = true;
    1291              :         }
    1292              : 
    1293              :         /* Show all columns when the column list is not specified. */
    1294         2094 :         if (nulls[2])
    1295              :         {
    1296         1968 :             Relation    rel = table_open(relid, AccessShareLock);
    1297         1968 :             int         nattnums = 0;
    1298              :             int16      *attnums;
    1299         1968 :             TupleDesc   desc = RelationGetDescr(rel);
    1300              :             int         i;
    1301              : 
    1302         1968 :             attnums = palloc_array(int16, desc->natts);
    1303              : 
    1304         5394 :             for (i = 0; i < desc->natts; i++)
    1305              :             {
    1306         3426 :                 Form_pg_attribute att = TupleDescAttr(desc, i);
    1307              : 
    1308         3426 :                 if (att->attisdropped)
    1309            6 :                     continue;
    1310              : 
    1311         3420 :                 if (att->attgenerated)
    1312              :                 {
    1313              :                     /* We only support replication of STORED generated cols. */
    1314           48 :                     if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
    1315           36 :                         continue;
    1316              : 
    1317              :                     /*
    1318              :                      * User hasn't requested to replicate STORED generated
    1319              :                      * cols.
    1320              :                      */
    1321           12 :                     if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
    1322            9 :                         continue;
    1323              :                 }
    1324              : 
    1325         3375 :                 attnums[nattnums++] = att->attnum;
    1326              :             }
    1327              : 
    1328         1968 :             if (nattnums > 0)
    1329              :             {
    1330         1946 :                 values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
    1331         1946 :                 nulls[2] = false;
    1332              :             }
    1333              : 
    1334         1968 :             table_close(rel, AccessShareLock);
    1335              :         }
    1336              : 
    1337         2094 :         rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
    1338              : 
    1339         2094 :         SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
    1340              :     }
    1341              : 
    1342         1020 :     SRF_RETURN_DONE(funcctx);
    1343              : }
    1344              : 
    1345              : /*
    1346              :  * Returns Oids of sequences in a publication.
    1347              :  */
    1348              : Datum
    1349          226 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
    1350              : {
    1351              :     FuncCallContext *funcctx;
    1352          226 :     List       *sequences = NIL;
    1353              : 
    1354              :     /* stuff done only on the first call of the function */
    1355          226 :     if (SRF_IS_FIRSTCALL())
    1356              :     {
    1357          209 :         char       *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1358              :         Publication *publication;
    1359              :         MemoryContext oldcontext;
    1360              : 
    1361              :         /* create a function context for cross-call persistence */
    1362          209 :         funcctx = SRF_FIRSTCALL_INIT();
    1363              : 
    1364              :         /* switch to memory context appropriate for multiple function calls */
    1365          209 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1366              : 
    1367          209 :         publication = GetPublicationByName(pubname, false);
    1368              : 
    1369          209 :         if (publication->allsequences)
    1370            6 :             sequences = GetAllPublicationRelations(RELKIND_SEQUENCE, false);
    1371              : 
    1372          209 :         funcctx->user_fctx = sequences;
    1373              : 
    1374          209 :         MemoryContextSwitchTo(oldcontext);
    1375              :     }
    1376              : 
    1377              :     /* stuff done on every call of the function */
    1378          226 :     funcctx = SRF_PERCALL_SETUP();
    1379          226 :     sequences = (List *) funcctx->user_fctx;
    1380              : 
    1381          226 :     if (funcctx->call_cntr < list_length(sequences))
    1382              :     {
    1383           17 :         Oid         relid = list_nth_oid(sequences, funcctx->call_cntr);
    1384              : 
    1385           17 :         SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
    1386              :     }
    1387              : 
    1388          209 :     SRF_RETURN_DONE(funcctx);
    1389              : }
        

Generated by: LCOV version 2.0-1