LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 437 440 99.3 %
Date: 2025-02-22 07:14:56 Functions: 28 28 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_publication.c
       4             :  *      publication C API manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_publication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "catalog/catalog.h"
      22             : #include "catalog/dependency.h"
      23             : #include "catalog/indexing.h"
      24             : #include "catalog/namespace.h"
      25             : #include "catalog/objectaddress.h"
      26             : #include "catalog/partition.h"
      27             : #include "catalog/pg_inherits.h"
      28             : #include "catalog/pg_namespace.h"
      29             : #include "catalog/pg_publication.h"
      30             : #include "catalog/pg_publication_namespace.h"
      31             : #include "catalog/pg_publication_rel.h"
      32             : #include "catalog/pg_type.h"
      33             : #include "commands/publicationcmds.h"
      34             : #include "funcapi.h"
      35             : #include "utils/array.h"
      36             : #include "utils/builtins.h"
      37             : #include "utils/catcache.h"
      38             : #include "utils/fmgroids.h"
      39             : #include "utils/lsyscache.h"
      40             : #include "utils/rel.h"
      41             : #include "utils/syscache.h"
      42             : 
      43             : /* Records association between publication and published table */
      44             : typedef struct
      45             : {
      46             :     Oid         relid;          /* OID of published table */
      47             :     Oid         pubid;          /* OID of publication that publishes this
      48             :                                  * table. */
      49             : } published_rel;
      50             : 
      51             : /*
      52             :  * Check if relation can be in given publication and throws appropriate
      53             :  * error if not.
      54             :  */
      55             : static void
      56        1070 : check_publication_add_relation(Relation targetrel)
      57             : {
      58             :     /* Must be a regular or partitioned table */
      59        1070 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
      60         144 :         RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
      61          14 :         ereport(ERROR,
      62             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      63             :                  errmsg("cannot add relation \"%s\" to publication",
      64             :                         RelationGetRelationName(targetrel)),
      65             :                  errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
      66             : 
      67             :     /* Can't be system table */
      68        1056 :     if (IsCatalogRelation(targetrel))
      69           6 :         ereport(ERROR,
      70             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      71             :                  errmsg("cannot add relation \"%s\" to publication",
      72             :                         RelationGetRelationName(targetrel)),
      73             :                  errdetail("This operation is not supported for system tables.")));
      74             : 
      75             :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      76        1050 :     if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
      77           6 :         ereport(ERROR,
      78             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      79             :                  errmsg("cannot add relation \"%s\" to publication",
      80             :                         RelationGetRelationName(targetrel)),
      81             :                  errdetail("This operation is not supported for temporary tables.")));
      82        1044 :     else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
      83           6 :         ereport(ERROR,
      84             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      85             :                  errmsg("cannot add relation \"%s\" to publication",
      86             :                         RelationGetRelationName(targetrel)),
      87             :                  errdetail("This operation is not supported for unlogged tables.")));
      88        1038 : }
      89             : 
      90             : /*
      91             :  * Check if schema can be in given publication and throw appropriate error if
      92             :  * not.
      93             :  */
      94             : static void
      95         204 : check_publication_add_schema(Oid schemaid)
      96             : {
      97             :     /* Can't be system namespace */
      98         204 :     if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
      99           6 :         ereport(ERROR,
     100             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     101             :                  errmsg("cannot add schema \"%s\" to publication",
     102             :                         get_namespace_name(schemaid)),
     103             :                  errdetail("This operation is not supported for system schemas.")));
     104             : 
     105             :     /* Can't be temporary namespace */
     106         198 :     if (isAnyTempNamespace(schemaid))
     107           0 :         ereport(ERROR,
     108             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     109             :                  errmsg("cannot add schema \"%s\" to publication",
     110             :                         get_namespace_name(schemaid)),
     111             :                  errdetail("Temporary schemas cannot be replicated.")));
     112         198 : }
     113             : 
     114             : /*
     115             :  * Returns if relation represented by oid and Form_pg_class entry
     116             :  * is publishable.
     117             :  *
     118             :  * Does same checks as check_publication_add_relation() above, but does not
     119             :  * need relation to be opened and also does not throw errors.
     120             :  *
     121             :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
     122             :  * ie all tables created during initdb.  This mainly affects the preinstalled
     123             :  * information_schema.  IsCatalogRelationOid() only excludes tables with
     124             :  * relid < FirstUnpinnedObjectId, making that test rather redundant,
     125             :  * but really we should get rid of the FirstNormalObjectId test not
     126             :  * IsCatalogRelationOid.  We can't do so today because we don't want
     127             :  * information_schema tables to be considered publishable; but this test
     128             :  * is really inadequate for that, since the information_schema could be
     129             :  * dropped and reloaded and then it'll be considered publishable.  The best
     130             :  * long-term solution may be to add a "relispublishable" bool to pg_class,
     131             :  * and depend on that instead of OID checks.
     132             :  */
     133             : static bool
     134      593930 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     135             : {
     136      605942 :     return (reltuple->relkind == RELKIND_RELATION ||
     137       12012 :             reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
     138      583350 :         !IsCatalogRelationOid(relid) &&
     139     1187860 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     140             :         relid >= FirstNormalObjectId;
     141             : }
     142             : 
     143             : /*
     144             :  * Another variant of is_publishable_class(), taking a Relation.
     145             :  */
     146             : bool
     147      539148 : is_publishable_relation(Relation rel)
     148             : {
     149      539148 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     150             : }
     151             : 
     152             : /*
     153             :  * SQL-callable variant of the above
     154             :  *
     155             :  * This returns null when the relation does not exist.  This is intended to be
     156             :  * used for example in psql to avoid gratuitous errors when there are
     157             :  * concurrent catalog changes.
     158             :  */
     159             : Datum
     160        5960 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     161             : {
     162        5960 :     Oid         relid = PG_GETARG_OID(0);
     163             :     HeapTuple   tuple;
     164             :     bool        result;
     165             : 
     166        5960 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     167        5960 :     if (!HeapTupleIsValid(tuple))
     168           0 :         PG_RETURN_NULL();
     169        5960 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     170        5960 :     ReleaseSysCache(tuple);
     171        5960 :     PG_RETURN_BOOL(result);
     172             : }
     173             : 
     174             : /*
     175             :  * Returns true if the ancestor is in the list of published relations.
     176             :  * Otherwise, returns false.
     177             :  */
     178             : static bool
     179         202 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
     180             : {
     181             :     ListCell   *lc;
     182             : 
     183         690 :     foreach(lc, table_infos)
     184             :     {
     185         568 :         Oid         relid = ((published_rel *) lfirst(lc))->relid;
     186             : 
     187         568 :         if (relid == ancestor)
     188          80 :             return true;
     189             :     }
     190             : 
     191         122 :     return false;
     192             : }
     193             : 
     194             : /*
     195             :  * Filter out the partitions whose parent tables are also present in the list.
     196             :  */
     197             : static void
     198         352 : filter_partitions(List *table_infos)
     199             : {
     200             :     ListCell   *lc;
     201             : 
     202        1034 :     foreach(lc, table_infos)
     203             :     {
     204         682 :         bool        skip = false;
     205         682 :         List       *ancestors = NIL;
     206             :         ListCell   *lc2;
     207         682 :         published_rel *table_info = (published_rel *) lfirst(lc);
     208             : 
     209         682 :         if (get_rel_relispartition(table_info->relid))
     210         202 :             ancestors = get_partition_ancestors(table_info->relid);
     211             : 
     212         804 :         foreach(lc2, ancestors)
     213             :         {
     214         202 :             Oid         ancestor = lfirst_oid(lc2);
     215             : 
     216         202 :             if (is_ancestor_member_tableinfos(ancestor, table_infos))
     217             :             {
     218          80 :                 skip = true;
     219          80 :                 break;
     220             :             }
     221             :         }
     222             : 
     223         682 :         if (skip)
     224          80 :             table_infos = foreach_delete_current(table_infos, lc);
     225             :     }
     226         352 : }
     227             : 
     228             : /*
     229             :  * Returns true if any schema is associated with the publication, false if no
     230             :  * schema is associated with the publication.
     231             :  */
     232             : bool
     233         268 : is_schema_publication(Oid pubid)
     234             : {
     235             :     Relation    pubschsrel;
     236             :     ScanKeyData scankey;
     237             :     SysScanDesc scan;
     238             :     HeapTuple   tup;
     239         268 :     bool        result = false;
     240             : 
     241         268 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     242         268 :     ScanKeyInit(&scankey,
     243             :                 Anum_pg_publication_namespace_pnpubid,
     244             :                 BTEqualStrategyNumber, F_OIDEQ,
     245             :                 ObjectIdGetDatum(pubid));
     246             : 
     247         268 :     scan = systable_beginscan(pubschsrel,
     248             :                               PublicationNamespacePnnspidPnpubidIndexId,
     249             :                               true, NULL, 1, &scankey);
     250         268 :     tup = systable_getnext(scan);
     251         268 :     result = HeapTupleIsValid(tup);
     252             : 
     253         268 :     systable_endscan(scan);
     254         268 :     table_close(pubschsrel, AccessShareLock);
     255             : 
     256         268 :     return result;
     257             : }
     258             : 
     259             : /*
     260             :  * Returns true if the relation has column list associated with the
     261             :  * publication, false otherwise.
     262             :  *
     263             :  * If a column list is found, the corresponding bitmap is returned through the
     264             :  * cols parameter, if provided. The bitmap is constructed within the given
     265             :  * memory context (mcxt).
     266             :  */
     267             : bool
     268        1482 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
     269             :                             Bitmapset **cols)
     270             : {
     271             :     HeapTuple   cftuple;
     272        1482 :     bool        found = false;
     273             : 
     274        1482 :     if (pub->alltables)
     275         354 :         return false;
     276             : 
     277        1128 :     cftuple = SearchSysCache2(PUBLICATIONRELMAP,
     278             :                               ObjectIdGetDatum(relid),
     279             :                               ObjectIdGetDatum(pub->oid));
     280        1128 :     if (HeapTupleIsValid(cftuple))
     281             :     {
     282             :         Datum       cfdatum;
     283             :         bool        isnull;
     284             : 
     285             :         /* Lookup the column list attribute. */
     286        1026 :         cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
     287             :                                   Anum_pg_publication_rel_prattrs, &isnull);
     288             : 
     289             :         /* Was a column list found? */
     290        1026 :         if (!isnull)
     291             :         {
     292             :             /* Build the column list bitmap in the given memory context. */
     293         314 :             if (cols)
     294         308 :                 *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
     295             : 
     296         314 :             found = true;
     297             :         }
     298             : 
     299        1026 :         ReleaseSysCache(cftuple);
     300             :     }
     301             : 
     302        1128 :     return found;
     303             : }
     304             : 
     305             : /*
     306             :  * Gets the relations based on the publication partition option for a specified
     307             :  * relation.
     308             :  */
     309             : List *
     310        5646 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
     311             :                                Oid relid)
     312             : {
     313        5646 :     if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
     314             :         pub_partopt != PUBLICATION_PART_ROOT)
     315        1140 :     {
     316        1140 :         List       *all_parts = find_all_inheritors(relid, NoLock,
     317             :                                                     NULL);
     318             : 
     319        1140 :         if (pub_partopt == PUBLICATION_PART_ALL)
     320         938 :             result = list_concat(result, all_parts);
     321         202 :         else if (pub_partopt == PUBLICATION_PART_LEAF)
     322             :         {
     323             :             ListCell   *lc;
     324             : 
     325         738 :             foreach(lc, all_parts)
     326             :             {
     327         536 :                 Oid         partOid = lfirst_oid(lc);
     328             : 
     329         536 :                 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
     330         334 :                     result = lappend_oid(result, partOid);
     331             :             }
     332             :         }
     333             :         else
     334             :             Assert(false);
     335             :     }
     336             :     else
     337        4506 :         result = lappend_oid(result, relid);
     338             : 
     339        5646 :     return result;
     340             : }
     341             : 
     342             : /*
     343             :  * Returns the relid of the topmost ancestor that is published via this
     344             :  * publication if any and set its ancestor level to ancestor_level,
     345             :  * otherwise returns InvalidOid.
     346             :  *
     347             :  * The ancestor_level value allows us to compare the results for multiple
     348             :  * publications, and decide which value is higher up.
     349             :  *
     350             :  * Note that the list of ancestors should be ordered such that the topmost
     351             :  * ancestor is at the end of the list.
     352             :  */
     353             : Oid
     354         510 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
     355             : {
     356             :     ListCell   *lc;
     357         510 :     Oid         topmost_relid = InvalidOid;
     358         510 :     int         level = 0;
     359             : 
     360             :     /*
     361             :      * Find the "topmost" ancestor that is in this publication.
     362             :      */
     363        1036 :     foreach(lc, ancestors)
     364             :     {
     365         526 :         Oid         ancestor = lfirst_oid(lc);
     366         526 :         List       *apubids = GetRelationPublications(ancestor);
     367         526 :         List       *aschemaPubids = NIL;
     368             : 
     369         526 :         level++;
     370             : 
     371         526 :         if (list_member_oid(apubids, puboid))
     372             :         {
     373         312 :             topmost_relid = ancestor;
     374             : 
     375         312 :             if (ancestor_level)
     376          86 :                 *ancestor_level = level;
     377             :         }
     378             :         else
     379             :         {
     380         214 :             aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
     381         214 :             if (list_member_oid(aschemaPubids, puboid))
     382             :             {
     383          10 :                 topmost_relid = ancestor;
     384             : 
     385          10 :                 if (ancestor_level)
     386          10 :                     *ancestor_level = level;
     387             :             }
     388             :         }
     389             : 
     390         526 :         list_free(apubids);
     391         526 :         list_free(aschemaPubids);
     392             :     }
     393             : 
     394         510 :     return topmost_relid;
     395             : }
     396             : 
     397             : /*
     398             :  * attnumstoint2vector
     399             :  *      Convert a Bitmapset of AttrNumbers into an int2vector.
     400             :  *
     401             :  * AttrNumber numbers are 0-based, i.e., not offset by
     402             :  * FirstLowInvalidHeapAttributeNumber.
     403             :  */
     404             : static int2vector *
     405         308 : attnumstoint2vector(Bitmapset *attrs)
     406             : {
     407             :     int2vector *result;
     408         308 :     int         n = bms_num_members(attrs);
     409         308 :     int         i = -1;
     410         308 :     int         j = 0;
     411             : 
     412         308 :     result = buildint2vector(NULL, n);
     413             : 
     414         848 :     while ((i = bms_next_member(attrs, i)) >= 0)
     415             :     {
     416             :         Assert(i <= PG_INT16_MAX);
     417             : 
     418         540 :         result->values[j++] = (int16) i;
     419             :     }
     420             : 
     421         308 :     return result;
     422             : }
     423             : 
     424             : /*
     425             :  * Insert new publication / relation mapping.
     426             :  */
     427             : ObjectAddress
     428        1092 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
     429             :                          bool if_not_exists)
     430             : {
     431             :     Relation    rel;
     432             :     HeapTuple   tup;
     433             :     Datum       values[Natts_pg_publication_rel];
     434             :     bool        nulls[Natts_pg_publication_rel];
     435        1092 :     Relation    targetrel = pri->relation;
     436        1092 :     Oid         relid = RelationGetRelid(targetrel);
     437             :     Oid         pubreloid;
     438             :     Bitmapset  *attnums;
     439        1092 :     Publication *pub = GetPublication(pubid);
     440             :     ObjectAddress myself,
     441             :                 referenced;
     442        1092 :     List       *relids = NIL;
     443             :     int         i;
     444             : 
     445        1092 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     446             : 
     447             :     /*
     448             :      * Check for duplicates. Note that this does not really prevent
     449             :      * duplicates, it's here just to provide nicer error message in common
     450             :      * case. The real protection is the unique key on the catalog.
     451             :      */
     452        1092 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     453             :                               ObjectIdGetDatum(pubid)))
     454             :     {
     455          22 :         table_close(rel, RowExclusiveLock);
     456             : 
     457          22 :         if (if_not_exists)
     458          16 :             return InvalidObjectAddress;
     459             : 
     460           6 :         ereport(ERROR,
     461             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     462             :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     463             :                         RelationGetRelationName(targetrel), pub->name)));
     464             :     }
     465             : 
     466        1070 :     check_publication_add_relation(targetrel);
     467             : 
     468             :     /* Validate and translate column names into a Bitmapset of attnums. */
     469        1038 :     attnums = pub_collist_validate(pri->relation, pri->columns);
     470             : 
     471             :     /* Form a tuple. */
     472        1014 :     memset(values, 0, sizeof(values));
     473        1014 :     memset(nulls, false, sizeof(nulls));
     474             : 
     475        1014 :     pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     476             :                                    Anum_pg_publication_rel_oid);
     477        1014 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
     478        1014 :     values[Anum_pg_publication_rel_prpubid - 1] =
     479        1014 :         ObjectIdGetDatum(pubid);
     480        1014 :     values[Anum_pg_publication_rel_prrelid - 1] =
     481        1014 :         ObjectIdGetDatum(relid);
     482             : 
     483             :     /* Add qualifications, if available */
     484        1014 :     if (pri->whereClause != NULL)
     485         306 :         values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
     486             :     else
     487         708 :         nulls[Anum_pg_publication_rel_prqual - 1] = true;
     488             : 
     489             :     /* Add column list, if available */
     490        1014 :     if (pri->columns)
     491         308 :         values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
     492             :     else
     493         706 :         nulls[Anum_pg_publication_rel_prattrs - 1] = true;
     494             : 
     495        1014 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     496             : 
     497             :     /* Insert tuple into catalog. */
     498        1014 :     CatalogTupleInsert(rel, tup);
     499        1014 :     heap_freetuple(tup);
     500             : 
     501             :     /* Register dependencies as needed */
     502        1014 :     ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
     503             : 
     504             :     /* Add dependency on the publication */
     505        1014 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     506        1014 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     507             : 
     508             :     /* Add dependency on the relation */
     509        1014 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     510        1014 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     511             : 
     512             :     /* Add dependency on the objects mentioned in the qualifications */
     513        1014 :     if (pri->whereClause)
     514         306 :         recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
     515             :                                         DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
     516             :                                         false);
     517             : 
     518             :     /* Add dependency on the columns, if any are listed */
     519        1014 :     i = -1;
     520        1554 :     while ((i = bms_next_member(attnums, i)) >= 0)
     521             :     {
     522         540 :         ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
     523         540 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     524             :     }
     525             : 
     526             :     /* Close the table. */
     527        1014 :     table_close(rel, RowExclusiveLock);
     528             : 
     529             :     /*
     530             :      * Invalidate relcache so that publication info is rebuilt.
     531             :      *
     532             :      * For the partitioned tables, we must invalidate all partitions contained
     533             :      * in the respective partition hierarchies, not just the one explicitly
     534             :      * mentioned in the publication. This is required because we implicitly
     535             :      * publish the child tables when the parent table is published.
     536             :      */
     537        1014 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
     538             :                                             relid);
     539             : 
     540        1014 :     InvalidatePublicationRels(relids);
     541             : 
     542        1014 :     return myself;
     543             : }
     544             : 
     545             : /*
     546             :  * pub_collist_validate
     547             :  *      Process and validate the 'columns' list and ensure the columns are all
     548             :  *      valid to use for a publication.  Checks for and raises an ERROR for
     549             :  *      any unknown columns, system columns, duplicate columns, or virtual
     550             :  *      generated columns.
     551             :  *
     552             :  * Looks up each column's attnum and returns a 0-based Bitmapset of the
     553             :  * corresponding attnums.
     554             :  */
     555             : Bitmapset *
     556        1442 : pub_collist_validate(Relation targetrel, List *columns)
     557             : {
     558        1442 :     Bitmapset  *set = NULL;
     559             :     ListCell   *lc;
     560        1442 :     TupleDesc   tupdesc = RelationGetDescr(targetrel);
     561             : 
     562        2254 :     foreach(lc, columns)
     563             :     {
     564         848 :         char       *colname = strVal(lfirst(lc));
     565         848 :         AttrNumber  attnum = get_attnum(RelationGetRelid(targetrel), colname);
     566             : 
     567         848 :         if (attnum == InvalidAttrNumber)
     568           6 :             ereport(ERROR,
     569             :                     errcode(ERRCODE_UNDEFINED_COLUMN),
     570             :                     errmsg("column \"%s\" of relation \"%s\" does not exist",
     571             :                            colname, RelationGetRelationName(targetrel)));
     572             : 
     573         842 :         if (!AttrNumberIsForUserDefinedAttr(attnum))
     574          12 :             ereport(ERROR,
     575             :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     576             :                     errmsg("cannot use system column \"%s\" in publication column list",
     577             :                            colname));
     578             : 
     579         830 :         if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     580           6 :             ereport(ERROR,
     581             :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     582             :                     errmsg("cannot use virtual generated column \"%s\" in publication column list",
     583             :                            colname));
     584             : 
     585         824 :         if (bms_is_member(attnum, set))
     586          12 :             ereport(ERROR,
     587             :                     errcode(ERRCODE_DUPLICATE_OBJECT),
     588             :                     errmsg("duplicate column \"%s\" in publication column list",
     589             :                            colname));
     590             : 
     591         812 :         set = bms_add_member(set, attnum);
     592             :     }
     593             : 
     594        1406 :     return set;
     595             : }
     596             : 
     597             : /*
     598             :  * Transform a column list (represented by an array Datum) to a bitmapset.
     599             :  *
     600             :  * If columns isn't NULL, add the column numbers to that set.
     601             :  *
     602             :  * If mcxt isn't NULL, build the bitmapset in that context.
     603             :  */
     604             : Bitmapset *
     605         444 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
     606             : {
     607         444 :     Bitmapset  *result = columns;
     608             :     ArrayType  *arr;
     609             :     int         nelems;
     610             :     int16      *elems;
     611         444 :     MemoryContext oldcxt = NULL;
     612             : 
     613         444 :     arr = DatumGetArrayTypeP(pubcols);
     614         444 :     nelems = ARR_DIMS(arr)[0];
     615         444 :     elems = (int16 *) ARR_DATA_PTR(arr);
     616             : 
     617             :     /* If a memory context was specified, switch to it. */
     618         444 :     if (mcxt)
     619          78 :         oldcxt = MemoryContextSwitchTo(mcxt);
     620             : 
     621        1226 :     for (int i = 0; i < nelems; i++)
     622         782 :         result = bms_add_member(result, elems[i]);
     623             : 
     624         444 :     if (mcxt)
     625          78 :         MemoryContextSwitchTo(oldcxt);
     626             : 
     627         444 :     return result;
     628             : }
     629             : 
     630             : /*
     631             :  * Returns a bitmap representing the columns of the specified table.
     632             :  *
     633             :  * Generated columns are included if include_gencols_type is
     634             :  * PUBLISH_GENCOLS_STORED.
     635             :  */
     636             : Bitmapset *
     637          18 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
     638             : {
     639          18 :     Bitmapset  *result = NULL;
     640          18 :     TupleDesc   desc = RelationGetDescr(relation);
     641             : 
     642          60 :     for (int i = 0; i < desc->natts; i++)
     643             :     {
     644          42 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     645             : 
     646          42 :         if (att->attisdropped)
     647           2 :             continue;
     648             : 
     649          40 :         if (att->attgenerated)
     650             :         {
     651             :             /* We only support replication of STORED generated cols. */
     652           4 :             if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
     653           2 :                 continue;
     654             : 
     655             :             /* User hasn't requested to replicate STORED generated cols. */
     656           2 :             if (include_gencols_type != PUBLISH_GENCOLS_STORED)
     657           2 :                 continue;
     658             :         }
     659             : 
     660          36 :         result = bms_add_member(result, att->attnum);
     661             :     }
     662             : 
     663          18 :     return result;
     664             : }
     665             : 
     666             : /*
     667             :  * Insert new publication / schema mapping.
     668             :  */
     669             : ObjectAddress
     670         222 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
     671             : {
     672             :     Relation    rel;
     673             :     HeapTuple   tup;
     674             :     Datum       values[Natts_pg_publication_namespace];
     675             :     bool        nulls[Natts_pg_publication_namespace];
     676             :     Oid         psschid;
     677         222 :     Publication *pub = GetPublication(pubid);
     678         222 :     List       *schemaRels = NIL;
     679             :     ObjectAddress myself,
     680             :                 referenced;
     681             : 
     682         222 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
     683             : 
     684             :     /*
     685             :      * Check for duplicates. Note that this does not really prevent
     686             :      * duplicates, it's here just to provide nicer error message in common
     687             :      * case. The real protection is the unique key on the catalog.
     688             :      */
     689         222 :     if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     690             :                               ObjectIdGetDatum(schemaid),
     691             :                               ObjectIdGetDatum(pubid)))
     692             :     {
     693          18 :         table_close(rel, RowExclusiveLock);
     694             : 
     695          18 :         if (if_not_exists)
     696          12 :             return InvalidObjectAddress;
     697             : 
     698           6 :         ereport(ERROR,
     699             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     700             :                  errmsg("schema \"%s\" is already member of publication \"%s\"",
     701             :                         get_namespace_name(schemaid), pub->name)));
     702             :     }
     703             : 
     704         204 :     check_publication_add_schema(schemaid);
     705             : 
     706             :     /* Form a tuple */
     707         198 :     memset(values, 0, sizeof(values));
     708         198 :     memset(nulls, false, sizeof(nulls));
     709             : 
     710         198 :     psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
     711             :                                  Anum_pg_publication_namespace_oid);
     712         198 :     values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
     713         198 :     values[Anum_pg_publication_namespace_pnpubid - 1] =
     714         198 :         ObjectIdGetDatum(pubid);
     715         198 :     values[Anum_pg_publication_namespace_pnnspid - 1] =
     716         198 :         ObjectIdGetDatum(schemaid);
     717             : 
     718         198 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     719             : 
     720             :     /* Insert tuple into catalog */
     721         198 :     CatalogTupleInsert(rel, tup);
     722         198 :     heap_freetuple(tup);
     723             : 
     724         198 :     ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
     725             : 
     726             :     /* Add dependency on the publication */
     727         198 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     728         198 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     729             : 
     730             :     /* Add dependency on the schema */
     731         198 :     ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
     732         198 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     733             : 
     734             :     /* Close the table */
     735         198 :     table_close(rel, RowExclusiveLock);
     736             : 
     737             :     /*
     738             :      * Invalidate relcache so that publication info is rebuilt. See
     739             :      * publication_add_relation for why we need to consider all the
     740             :      * partitions.
     741             :      */
     742         198 :     schemaRels = GetSchemaPublicationRelations(schemaid,
     743             :                                                PUBLICATION_PART_ALL);
     744         198 :     InvalidatePublicationRels(schemaRels);
     745             : 
     746         198 :     return myself;
     747             : }
     748             : 
     749             : /* Gets list of publication oids for a relation */
     750             : List *
     751       11914 : GetRelationPublications(Oid relid)
     752             : {
     753       11914 :     List       *result = NIL;
     754             :     CatCList   *pubrellist;
     755             :     int         i;
     756             : 
     757             :     /* Find all publications associated with the relation. */
     758       11914 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     759             :                                      ObjectIdGetDatum(relid));
     760       13466 :     for (i = 0; i < pubrellist->n_members; i++)
     761             :     {
     762        1552 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     763        1552 :         Oid         pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
     764             : 
     765        1552 :         result = lappend_oid(result, pubid);
     766             :     }
     767             : 
     768       11914 :     ReleaseSysCacheList(pubrellist);
     769             : 
     770       11914 :     return result;
     771             : }
     772             : 
     773             : /*
     774             :  * Gets list of relation oids for a publication.
     775             :  *
     776             :  * This should only be used FOR TABLE publications, the FOR ALL TABLES
     777             :  * should use GetAllTablesPublicationRelations().
     778             :  */
     779             : List *
     780        2260 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     781             : {
     782             :     List       *result;
     783             :     Relation    pubrelsrel;
     784             :     ScanKeyData scankey;
     785             :     SysScanDesc scan;
     786             :     HeapTuple   tup;
     787             : 
     788             :     /* Find all publications associated with the relation. */
     789        2260 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     790             : 
     791        2260 :     ScanKeyInit(&scankey,
     792             :                 Anum_pg_publication_rel_prpubid,
     793             :                 BTEqualStrategyNumber, F_OIDEQ,
     794             :                 ObjectIdGetDatum(pubid));
     795             : 
     796        2260 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
     797             :                               true, NULL, 1, &scankey);
     798             : 
     799        2260 :     result = NIL;
     800        5284 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     801             :     {
     802             :         Form_pg_publication_rel pubrel;
     803             : 
     804        3024 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     805        3024 :         result = GetPubPartitionOptionRelations(result, pub_partopt,
     806             :                                                 pubrel->prrelid);
     807             :     }
     808             : 
     809        2260 :     systable_endscan(scan);
     810        2260 :     table_close(pubrelsrel, AccessShareLock);
     811             : 
     812             :     /* Now sort and de-duplicate the result list */
     813        2260 :     list_sort(result, list_oid_cmp);
     814        2260 :     list_deduplicate_oid(result);
     815             : 
     816        2260 :     return result;
     817             : }
     818             : 
     819             : /*
     820             :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     821             :  */
     822             : List *
     823        8106 : GetAllTablesPublications(void)
     824             : {
     825             :     List       *result;
     826             :     Relation    rel;
     827             :     ScanKeyData scankey;
     828             :     SysScanDesc scan;
     829             :     HeapTuple   tup;
     830             : 
     831             :     /* Find all publications that are marked as for all tables. */
     832        8106 :     rel = table_open(PublicationRelationId, AccessShareLock);
     833             : 
     834        8106 :     ScanKeyInit(&scankey,
     835             :                 Anum_pg_publication_puballtables,
     836             :                 BTEqualStrategyNumber, F_BOOLEQ,
     837             :                 BoolGetDatum(true));
     838             : 
     839        8106 :     scan = systable_beginscan(rel, InvalidOid, false,
     840             :                               NULL, 1, &scankey);
     841             : 
     842        8106 :     result = NIL;
     843        8298 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     844             :     {
     845         192 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
     846             : 
     847         192 :         result = lappend_oid(result, oid);
     848             :     }
     849             : 
     850        8106 :     systable_endscan(scan);
     851        8106 :     table_close(rel, AccessShareLock);
     852             : 
     853        8106 :     return result;
     854             : }
     855             : 
     856             : /*
     857             :  * Gets list of all relation published by FOR ALL TABLES publication(s).
     858             :  *
     859             :  * If the publication publishes partition changes via their respective root
     860             :  * partitioned tables, we must exclude partitions in favor of including the
     861             :  * root partitioned tables.
     862             :  */
     863             : List *
     864         332 : GetAllTablesPublicationRelations(bool pubviaroot)
     865             : {
     866             :     Relation    classRel;
     867             :     ScanKeyData key[1];
     868             :     TableScanDesc scan;
     869             :     HeapTuple   tuple;
     870         332 :     List       *result = NIL;
     871             : 
     872         332 :     classRel = table_open(RelationRelationId, AccessShareLock);
     873             : 
     874         332 :     ScanKeyInit(&key[0],
     875             :                 Anum_pg_class_relkind,
     876             :                 BTEqualStrategyNumber, F_CHAREQ,
     877             :                 CharGetDatum(RELKIND_RELATION));
     878             : 
     879         332 :     scan = table_beginscan_catalog(classRel, 1, key);
     880             : 
     881       24480 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     882             :     {
     883       24148 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     884       24148 :         Oid         relid = relForm->oid;
     885             : 
     886       24148 :         if (is_publishable_class(relid, relForm) &&
     887        1560 :             !(relForm->relispartition && pubviaroot))
     888        1378 :             result = lappend_oid(result, relid);
     889             :     }
     890             : 
     891         332 :     table_endscan(scan);
     892             : 
     893         332 :     if (pubviaroot)
     894             :     {
     895          26 :         ScanKeyInit(&key[0],
     896             :                     Anum_pg_class_relkind,
     897             :                     BTEqualStrategyNumber, F_CHAREQ,
     898             :                     CharGetDatum(RELKIND_PARTITIONED_TABLE));
     899             : 
     900          26 :         scan = table_beginscan_catalog(classRel, 1, key);
     901             : 
     902         156 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     903             :         {
     904         130 :             Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     905         130 :             Oid         relid = relForm->oid;
     906             : 
     907         130 :             if (is_publishable_class(relid, relForm) &&
     908         130 :                 !relForm->relispartition)
     909         104 :                 result = lappend_oid(result, relid);
     910             :         }
     911             : 
     912          26 :         table_endscan(scan);
     913             :     }
     914             : 
     915         332 :     table_close(classRel, AccessShareLock);
     916         332 :     return result;
     917             : }
     918             : 
     919             : /*
     920             :  * Gets the list of schema oids for a publication.
     921             :  *
     922             :  * This should only be used FOR TABLES IN SCHEMA publications.
     923             :  */
     924             : List *
     925        2178 : GetPublicationSchemas(Oid pubid)
     926             : {
     927        2178 :     List       *result = NIL;
     928             :     Relation    pubschsrel;
     929             :     ScanKeyData scankey;
     930             :     SysScanDesc scan;
     931             :     HeapTuple   tup;
     932             : 
     933             :     /* Find all schemas associated with the publication */
     934        2178 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     935             : 
     936        2178 :     ScanKeyInit(&scankey,
     937             :                 Anum_pg_publication_namespace_pnpubid,
     938             :                 BTEqualStrategyNumber, F_OIDEQ,
     939             :                 ObjectIdGetDatum(pubid));
     940             : 
     941        2178 :     scan = systable_beginscan(pubschsrel,
     942             :                               PublicationNamespacePnnspidPnpubidIndexId,
     943             :                               true, NULL, 1, &scankey);
     944        2278 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     945             :     {
     946             :         Form_pg_publication_namespace pubsch;
     947             : 
     948         100 :         pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
     949             : 
     950         100 :         result = lappend_oid(result, pubsch->pnnspid);
     951             :     }
     952             : 
     953        2178 :     systable_endscan(scan);
     954        2178 :     table_close(pubschsrel, AccessShareLock);
     955             : 
     956        2178 :     return result;
     957             : }
     958             : 
     959             : /*
     960             :  * Gets the list of publication oids associated with a specified schema.
     961             :  */
     962             : List *
     963       11498 : GetSchemaPublications(Oid schemaid)
     964             : {
     965       11498 :     List       *result = NIL;
     966             :     CatCList   *pubschlist;
     967             :     int         i;
     968             : 
     969             :     /* Find all publications associated with the schema */
     970       11498 :     pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
     971             :                                      ObjectIdGetDatum(schemaid));
     972       11612 :     for (i = 0; i < pubschlist->n_members; i++)
     973             :     {
     974         114 :         HeapTuple   tup = &pubschlist->members[i]->tuple;
     975         114 :         Oid         pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
     976             : 
     977         114 :         result = lappend_oid(result, pubid);
     978             :     }
     979             : 
     980       11498 :     ReleaseSysCacheList(pubschlist);
     981             : 
     982       11498 :     return result;
     983             : }
     984             : 
     985             : /*
     986             :  * Get the list of publishable relation oids for a specified schema.
     987             :  */
     988             : List *
     989         460 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
     990             : {
     991             :     Relation    classRel;
     992             :     ScanKeyData key[1];
     993             :     TableScanDesc scan;
     994             :     HeapTuple   tuple;
     995         460 :     List       *result = NIL;
     996             : 
     997             :     Assert(OidIsValid(schemaid));
     998             : 
     999         460 :     classRel = table_open(RelationRelationId, AccessShareLock);
    1000             : 
    1001         460 :     ScanKeyInit(&key[0],
    1002             :                 Anum_pg_class_relnamespace,
    1003             :                 BTEqualStrategyNumber, F_OIDEQ,
    1004             :                 schemaid);
    1005             : 
    1006             :     /* get all the relations present in the specified schema */
    1007         460 :     scan = table_beginscan_catalog(classRel, 1, key);
    1008       25004 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    1009             :     {
    1010       24544 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
    1011       24544 :         Oid         relid = relForm->oid;
    1012             :         char        relkind;
    1013             : 
    1014       24544 :         if (!is_publishable_class(relid, relForm))
    1015       10244 :             continue;
    1016             : 
    1017       14300 :         relkind = get_rel_relkind(relid);
    1018       14300 :         if (relkind == RELKIND_RELATION)
    1019       13566 :             result = lappend_oid(result, relid);
    1020         734 :         else if (relkind == RELKIND_PARTITIONED_TABLE)
    1021             :         {
    1022         734 :             List       *partitionrels = NIL;
    1023             : 
    1024             :             /*
    1025             :              * It is quite possible that some of the partitions are in a
    1026             :              * different schema than the parent table, so we need to get such
    1027             :              * partitions separately.
    1028             :              */
    1029         734 :             partitionrels = GetPubPartitionOptionRelations(partitionrels,
    1030             :                                                            pub_partopt,
    1031             :                                                            relForm->oid);
    1032         734 :             result = list_concat_unique_oid(result, partitionrels);
    1033             :         }
    1034             :     }
    1035             : 
    1036         460 :     table_endscan(scan);
    1037         460 :     table_close(classRel, AccessShareLock);
    1038         460 :     return result;
    1039             : }
    1040             : 
    1041             : /*
    1042             :  * Gets the list of all relations published by FOR TABLES IN SCHEMA
    1043             :  * publication.
    1044             :  */
    1045             : List *
    1046        1774 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
    1047             : {
    1048        1774 :     List       *result = NIL;
    1049        1774 :     List       *pubschemalist = GetPublicationSchemas(pubid);
    1050             :     ListCell   *cell;
    1051             : 
    1052        1844 :     foreach(cell, pubschemalist)
    1053             :     {
    1054          70 :         Oid         schemaid = lfirst_oid(cell);
    1055          70 :         List       *schemaRels = NIL;
    1056             : 
    1057          70 :         schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
    1058          70 :         result = list_concat(result, schemaRels);
    1059             :     }
    1060             : 
    1061        1774 :     return result;
    1062             : }
    1063             : 
    1064             : /*
    1065             :  * Get publication using oid
    1066             :  *
    1067             :  * The Publication struct and its data are palloc'ed here.
    1068             :  */
    1069             : Publication *
    1070        8710 : GetPublication(Oid pubid)
    1071             : {
    1072             :     HeapTuple   tup;
    1073             :     Publication *pub;
    1074             :     Form_pg_publication pubform;
    1075             : 
    1076        8710 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1077        8710 :     if (!HeapTupleIsValid(tup))
    1078           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1079             : 
    1080        8710 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1081             : 
    1082        8710 :     pub = (Publication *) palloc(sizeof(Publication));
    1083        8710 :     pub->oid = pubid;
    1084        8710 :     pub->name = pstrdup(NameStr(pubform->pubname));
    1085        8710 :     pub->alltables = pubform->puballtables;
    1086        8710 :     pub->pubactions.pubinsert = pubform->pubinsert;
    1087        8710 :     pub->pubactions.pubupdate = pubform->pubupdate;
    1088        8710 :     pub->pubactions.pubdelete = pubform->pubdelete;
    1089        8710 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
    1090        8710 :     pub->pubviaroot = pubform->pubviaroot;
    1091        8710 :     pub->pubgencols_type = pubform->pubgencols;
    1092             : 
    1093        8710 :     ReleaseSysCache(tup);
    1094             : 
    1095        8710 :     return pub;
    1096             : }
    1097             : 
    1098             : /*
    1099             :  * Get Publication using name.
    1100             :  */
    1101             : Publication *
    1102        2436 : GetPublicationByName(const char *pubname, bool missing_ok)
    1103             : {
    1104             :     Oid         oid;
    1105             : 
    1106        2436 :     oid = get_publication_oid(pubname, missing_ok);
    1107             : 
    1108        2434 :     return OidIsValid(oid) ? GetPublication(oid) : NULL;
    1109             : }
    1110             : 
    1111             : /*
    1112             :  * Get information of the tables in the given publication array.
    1113             :  *
    1114             :  * Returns pubid, relid, column list, row filter for each table.
    1115             :  */
    1116             : Datum
    1117        5980 : pg_get_publication_tables(PG_FUNCTION_ARGS)
    1118             : {
    1119             : #define NUM_PUBLICATION_TABLES_ELEM 4
    1120             :     FuncCallContext *funcctx;
    1121        5980 :     List       *table_infos = NIL;
    1122             : 
    1123             :     /* stuff done only on the first call of the function */
    1124        5980 :     if (SRF_IS_FIRSTCALL())
    1125             :     {
    1126             :         TupleDesc   tupdesc;
    1127             :         MemoryContext oldcontext;
    1128             :         ArrayType  *arr;
    1129             :         Datum      *elems;
    1130             :         int         nelems,
    1131             :                     i;
    1132        1920 :         bool        viaroot = false;
    1133             : 
    1134             :         /* create a function context for cross-call persistence */
    1135        1920 :         funcctx = SRF_FIRSTCALL_INIT();
    1136             : 
    1137             :         /* switch to memory context appropriate for multiple function calls */
    1138        1920 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1139             : 
    1140             :         /*
    1141             :          * Deconstruct the parameter into elements where each element is a
    1142             :          * publication name.
    1143             :          */
    1144        1920 :         arr = PG_GETARG_ARRAYTYPE_P(0);
    1145        1920 :         deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
    1146             : 
    1147             :         /* Get Oids of tables from each publication. */
    1148        3930 :         for (i = 0; i < nelems; i++)
    1149             :         {
    1150             :             Publication *pub_elem;
    1151        2010 :             List       *pub_elem_tables = NIL;
    1152             :             ListCell   *lc;
    1153             : 
    1154        2010 :             pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
    1155             : 
    1156             :             /*
    1157             :              * Publications support partitioned tables. If
    1158             :              * publish_via_partition_root is false, all changes are replicated
    1159             :              * using leaf partition identity and schema, so we only need
    1160             :              * those. Otherwise, get the partitioned table itself.
    1161             :              */
    1162        2010 :             if (pub_elem->alltables)
    1163         332 :                 pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
    1164             :             else
    1165             :             {
    1166             :                 List       *relids,
    1167             :                            *schemarelids;
    1168             : 
    1169        1678 :                 relids = GetPublicationRelations(pub_elem->oid,
    1170        1678 :                                                  pub_elem->pubviaroot ?
    1171        1678 :                                                  PUBLICATION_PART_ROOT :
    1172             :                                                  PUBLICATION_PART_LEAF);
    1173        1678 :                 schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
    1174        1678 :                                                                 pub_elem->pubviaroot ?
    1175        1678 :                                                                 PUBLICATION_PART_ROOT :
    1176             :                                                                 PUBLICATION_PART_LEAF);
    1177        1678 :                 pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
    1178             :             }
    1179             : 
    1180             :             /*
    1181             :              * Record the published table and the corresponding publication so
    1182             :              * that we can get row filters and column lists later.
    1183             :              *
    1184             :              * When a table is published by multiple publications, to obtain
    1185             :              * all row filters and column lists, the structure related to this
    1186             :              * table will be recorded multiple times.
    1187             :              */
    1188        6150 :             foreach(lc, pub_elem_tables)
    1189             :             {
    1190        4140 :                 published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
    1191             : 
    1192        4140 :                 table_info->relid = lfirst_oid(lc);
    1193        4140 :                 table_info->pubid = pub_elem->oid;
    1194        4140 :                 table_infos = lappend(table_infos, table_info);
    1195             :             }
    1196             : 
    1197             :             /* At least one publication is using publish_via_partition_root. */
    1198        2010 :             if (pub_elem->pubviaroot)
    1199         368 :                 viaroot = true;
    1200             :         }
    1201             : 
    1202             :         /*
    1203             :          * If the publication publishes partition changes via their respective
    1204             :          * root partitioned tables, we must exclude partitions in favor of
    1205             :          * including the root partitioned tables. Otherwise, the function
    1206             :          * could return both the child and parent tables which could cause
    1207             :          * data of the child table to be double-published on the subscriber
    1208             :          * side.
    1209             :          */
    1210        1920 :         if (viaroot)
    1211         352 :             filter_partitions(table_infos);
    1212             : 
    1213             :         /* Construct a tuple descriptor for the result rows. */
    1214        1920 :         tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
    1215        1920 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
    1216             :                            OIDOID, -1, 0);
    1217        1920 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
    1218             :                            OIDOID, -1, 0);
    1219        1920 :         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
    1220             :                            INT2VECTOROID, -1, 0);
    1221        1920 :         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
    1222             :                            PG_NODE_TREEOID, -1, 0);
    1223             : 
    1224        1920 :         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
    1225        1920 :         funcctx->user_fctx = table_infos;
    1226             : 
    1227        1920 :         MemoryContextSwitchTo(oldcontext);
    1228             :     }
    1229             : 
    1230             :     /* stuff done on every call of the function */
    1231        5980 :     funcctx = SRF_PERCALL_SETUP();
    1232        5980 :     table_infos = (List *) funcctx->user_fctx;
    1233             : 
    1234        5980 :     if (funcctx->call_cntr < list_length(table_infos))
    1235             :     {
    1236        4060 :         HeapTuple   pubtuple = NULL;
    1237             :         HeapTuple   rettuple;
    1238             :         Publication *pub;
    1239        4060 :         published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
    1240        4060 :         Oid         relid = table_info->relid;
    1241        4060 :         Oid         schemaid = get_rel_namespace(relid);
    1242        4060 :         Datum       values[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1243        4060 :         bool        nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1244             : 
    1245             :         /*
    1246             :          * Form tuple with appropriate data.
    1247             :          */
    1248             : 
    1249        4060 :         pub = GetPublication(table_info->pubid);
    1250             : 
    1251        4060 :         values[0] = ObjectIdGetDatum(pub->oid);
    1252        4060 :         values[1] = ObjectIdGetDatum(relid);
    1253             : 
    1254             :         /*
    1255             :          * We don't consider row filters or column lists for FOR ALL TABLES or
    1256             :          * FOR TABLES IN SCHEMA publications.
    1257             :          */
    1258        4060 :         if (!pub->alltables &&
    1259        2578 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
    1260             :                                    ObjectIdGetDatum(schemaid),
    1261             :                                    ObjectIdGetDatum(pub->oid)))
    1262        2484 :             pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
    1263             :                                            ObjectIdGetDatum(relid),
    1264             :                                            ObjectIdGetDatum(pub->oid));
    1265             : 
    1266        4060 :         if (HeapTupleIsValid(pubtuple))
    1267             :         {
    1268             :             /* Lookup the column list attribute. */
    1269        2258 :             values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1270             :                                         Anum_pg_publication_rel_prattrs,
    1271             :                                         &(nulls[2]));
    1272             : 
    1273             :             /* Null indicates no filter. */
    1274        2258 :             values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1275             :                                         Anum_pg_publication_rel_prqual,
    1276             :                                         &(nulls[3]));
    1277             :         }
    1278             :         else
    1279             :         {
    1280        1802 :             nulls[2] = true;
    1281        1802 :             nulls[3] = true;
    1282             :         }
    1283             : 
    1284             :         /* Show all columns when the column list is not specified. */
    1285        4060 :         if (nulls[2])
    1286             :         {
    1287        3808 :             Relation    rel = table_open(relid, AccessShareLock);
    1288        3808 :             int         nattnums = 0;
    1289             :             int16      *attnums;
    1290        3808 :             TupleDesc   desc = RelationGetDescr(rel);
    1291             :             int         i;
    1292             : 
    1293        3808 :             attnums = (int16 *) palloc(desc->natts * sizeof(int16));
    1294             : 
    1295       10436 :             for (i = 0; i < desc->natts; i++)
    1296             :             {
    1297        6628 :                 Form_pg_attribute att = TupleDescAttr(desc, i);
    1298             : 
    1299        6628 :                 if (att->attisdropped)
    1300          12 :                     continue;
    1301             : 
    1302        6616 :                 if (att->attgenerated)
    1303             :                 {
    1304             :                     /* We only support replication of STORED generated cols. */
    1305          96 :                     if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
    1306          72 :                         continue;
    1307             : 
    1308             :                     /*
    1309             :                      * User hasn't requested to replicate STORED generated
    1310             :                      * cols.
    1311             :                      */
    1312          24 :                     if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
    1313          18 :                         continue;
    1314             :                 }
    1315             : 
    1316        6526 :                 attnums[nattnums++] = att->attnum;
    1317             :             }
    1318             : 
    1319        3808 :             if (nattnums > 0)
    1320             :             {
    1321        3764 :                 values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
    1322        3764 :                 nulls[2] = false;
    1323             :             }
    1324             : 
    1325        3808 :             table_close(rel, AccessShareLock);
    1326             :         }
    1327             : 
    1328        4060 :         rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
    1329             : 
    1330        4060 :         SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
    1331             :     }
    1332             : 
    1333        1920 :     SRF_RETURN_DONE(funcctx);
    1334             : }

Generated by: LCOV version 1.14