LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 424 427 99.3 %
Date: 2024-11-21 09:14:53 Functions: 28 28 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_publication.c
       4             :  *      publication C API manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_publication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "catalog/catalog.h"
      22             : #include "catalog/dependency.h"
      23             : #include "catalog/indexing.h"
      24             : #include "catalog/namespace.h"
      25             : #include "catalog/objectaddress.h"
      26             : #include "catalog/partition.h"
      27             : #include "catalog/pg_inherits.h"
      28             : #include "catalog/pg_namespace.h"
      29             : #include "catalog/pg_publication.h"
      30             : #include "catalog/pg_publication_namespace.h"
      31             : #include "catalog/pg_publication_rel.h"
      32             : #include "catalog/pg_type.h"
      33             : #include "commands/publicationcmds.h"
      34             : #include "funcapi.h"
      35             : #include "utils/array.h"
      36             : #include "utils/builtins.h"
      37             : #include "utils/catcache.h"
      38             : #include "utils/fmgroids.h"
      39             : #include "utils/lsyscache.h"
      40             : #include "utils/rel.h"
      41             : #include "utils/syscache.h"
      42             : 
      43             : /* Records association between publication and published table */
      44             : typedef struct
      45             : {
      46             :     Oid         relid;          /* OID of published table */
      47             :     Oid         pubid;          /* OID of publication that publishes this
      48             :                                  * table. */
      49             : } published_rel;
      50             : 
      51             : /*
      52             :  * Check if relation can be in given publication and throws appropriate
      53             :  * error if not.
      54             :  */
      55             : static void
      56        1024 : check_publication_add_relation(Relation targetrel)
      57             : {
      58             :     /* Must be a regular or partitioned table */
      59        1024 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
      60         142 :         RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
      61          14 :         ereport(ERROR,
      62             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      63             :                  errmsg("cannot add relation \"%s\" to publication",
      64             :                         RelationGetRelationName(targetrel)),
      65             :                  errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
      66             : 
      67             :     /* Can't be system table */
      68        1010 :     if (IsCatalogRelation(targetrel))
      69           6 :         ereport(ERROR,
      70             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      71             :                  errmsg("cannot add relation \"%s\" to publication",
      72             :                         RelationGetRelationName(targetrel)),
      73             :                  errdetail("This operation is not supported for system tables.")));
      74             : 
      75             :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      76        1004 :     if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
      77           6 :         ereport(ERROR,
      78             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      79             :                  errmsg("cannot add relation \"%s\" to publication",
      80             :                         RelationGetRelationName(targetrel)),
      81             :                  errdetail("This operation is not supported for temporary tables.")));
      82         998 :     else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
      83           6 :         ereport(ERROR,
      84             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      85             :                  errmsg("cannot add relation \"%s\" to publication",
      86             :                         RelationGetRelationName(targetrel)),
      87             :                  errdetail("This operation is not supported for unlogged tables.")));
      88         992 : }
      89             : 
      90             : /*
      91             :  * Check if schema can be in given publication and throw appropriate error if
      92             :  * not.
      93             :  */
      94             : static void
      95         204 : check_publication_add_schema(Oid schemaid)
      96             : {
      97             :     /* Can't be system namespace */
      98         204 :     if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
      99           6 :         ereport(ERROR,
     100             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     101             :                  errmsg("cannot add schema \"%s\" to publication",
     102             :                         get_namespace_name(schemaid)),
     103             :                  errdetail("This operation is not supported for system schemas.")));
     104             : 
     105             :     /* Can't be temporary namespace */
     106         198 :     if (isAnyTempNamespace(schemaid))
     107           0 :         ereport(ERROR,
     108             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     109             :                  errmsg("cannot add schema \"%s\" to publication",
     110             :                         get_namespace_name(schemaid)),
     111             :                  errdetail("Temporary schemas cannot be replicated.")));
     112         198 : }
     113             : 
     114             : /*
     115             :  * Returns if relation represented by oid and Form_pg_class entry
     116             :  * is publishable.
     117             :  *
     118             :  * Does same checks as check_publication_add_relation() above, but does not
     119             :  * need relation to be opened and also does not throw errors.
     120             :  *
     121             :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
     122             :  * ie all tables created during initdb.  This mainly affects the preinstalled
     123             :  * information_schema.  IsCatalogRelationOid() only excludes tables with
     124             :  * relid < FirstUnpinnedObjectId, making that test rather redundant,
     125             :  * but really we should get rid of the FirstNormalObjectId test not
     126             :  * IsCatalogRelationOid.  We can't do so today because we don't want
     127             :  * information_schema tables to be considered publishable; but this test
     128             :  * is really inadequate for that, since the information_schema could be
     129             :  * dropped and reloaded and then it'll be considered publishable.  The best
     130             :  * long-term solution may be to add a "relispublishable" bool to pg_class,
     131             :  * and depend on that instead of OID checks.
     132             :  */
     133             : static bool
     134      587916 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     135             : {
     136      599256 :     return (reltuple->relkind == RELKIND_RELATION ||
     137       11340 :             reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
     138      577984 :         !IsCatalogRelationOid(relid) &&
     139     1175832 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     140             :         relid >= FirstNormalObjectId;
     141             : }
     142             : 
     143             : /*
     144             :  * Another variant of is_publishable_class(), taking a Relation.
     145             :  */
     146             : bool
     147      538292 : is_publishable_relation(Relation rel)
     148             : {
     149      538292 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     150             : }
     151             : 
     152             : /*
     153             :  * SQL-callable variant of the above
     154             :  *
     155             :  * This returns null when the relation does not exist.  This is intended to be
     156             :  * used for example in psql to avoid gratuitous errors when there are
     157             :  * concurrent catalog changes.
     158             :  */
     159             : Datum
     160        5540 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     161             : {
     162        5540 :     Oid         relid = PG_GETARG_OID(0);
     163             :     HeapTuple   tuple;
     164             :     bool        result;
     165             : 
     166        5540 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     167        5540 :     if (!HeapTupleIsValid(tuple))
     168           0 :         PG_RETURN_NULL();
     169        5540 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     170        5540 :     ReleaseSysCache(tuple);
     171        5540 :     PG_RETURN_BOOL(result);
     172             : }
     173             : 
     174             : /*
     175             :  * Returns true if the ancestor is in the list of published relations.
     176             :  * Otherwise, returns false.
     177             :  */
     178             : static bool
     179         198 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
     180             : {
     181             :     ListCell   *lc;
     182             : 
     183         670 :     foreach(lc, table_infos)
     184             :     {
     185         552 :         Oid         relid = ((published_rel *) lfirst(lc))->relid;
     186             : 
     187         552 :         if (relid == ancestor)
     188          80 :             return true;
     189             :     }
     190             : 
     191         118 :     return false;
     192             : }
     193             : 
     194             : /*
     195             :  * Filter out the partitions whose parent tables are also present in the list.
     196             :  */
     197             : static void
     198         336 : filter_partitions(List *table_infos)
     199             : {
     200             :     ListCell   *lc;
     201             : 
     202         998 :     foreach(lc, table_infos)
     203             :     {
     204         662 :         bool        skip = false;
     205         662 :         List       *ancestors = NIL;
     206             :         ListCell   *lc2;
     207         662 :         published_rel *table_info = (published_rel *) lfirst(lc);
     208             : 
     209         662 :         if (get_rel_relispartition(table_info->relid))
     210         198 :             ancestors = get_partition_ancestors(table_info->relid);
     211             : 
     212         780 :         foreach(lc2, ancestors)
     213             :         {
     214         198 :             Oid         ancestor = lfirst_oid(lc2);
     215             : 
     216         198 :             if (is_ancestor_member_tableinfos(ancestor, table_infos))
     217             :             {
     218          80 :                 skip = true;
     219          80 :                 break;
     220             :             }
     221             :         }
     222             : 
     223         662 :         if (skip)
     224          80 :             table_infos = foreach_delete_current(table_infos, lc);
     225             :     }
     226         336 : }
     227             : 
     228             : /*
     229             :  * Returns true if any schema is associated with the publication, false if no
     230             :  * schema is associated with the publication.
     231             :  */
     232             : bool
     233         262 : is_schema_publication(Oid pubid)
     234             : {
     235             :     Relation    pubschsrel;
     236             :     ScanKeyData scankey;
     237             :     SysScanDesc scan;
     238             :     HeapTuple   tup;
     239         262 :     bool        result = false;
     240             : 
     241         262 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     242         262 :     ScanKeyInit(&scankey,
     243             :                 Anum_pg_publication_namespace_pnpubid,
     244             :                 BTEqualStrategyNumber, F_OIDEQ,
     245             :                 ObjectIdGetDatum(pubid));
     246             : 
     247         262 :     scan = systable_beginscan(pubschsrel,
     248             :                               PublicationNamespacePnnspidPnpubidIndexId,
     249             :                               true, NULL, 1, &scankey);
     250         262 :     tup = systable_getnext(scan);
     251         262 :     result = HeapTupleIsValid(tup);
     252             : 
     253         262 :     systable_endscan(scan);
     254         262 :     table_close(pubschsrel, AccessShareLock);
     255             : 
     256         262 :     return result;
     257             : }
     258             : 
     259             : /*
     260             :  * Returns true if the relation has column list associated with the
     261             :  * publication, false otherwise.
     262             :  *
     263             :  * If a column list is found, the corresponding bitmap is returned through the
     264             :  * cols parameter, if provided. The bitmap is constructed within the given
     265             :  * memory context (mcxt).
     266             :  */
     267             : bool
     268         556 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
     269             :                             Bitmapset **cols)
     270             : {
     271             :     HeapTuple   cftuple;
     272         556 :     bool        found = false;
     273             : 
     274         556 :     if (pub->alltables)
     275         142 :         return false;
     276             : 
     277         414 :     cftuple = SearchSysCache2(PUBLICATIONRELMAP,
     278             :                               ObjectIdGetDatum(relid),
     279             :                               ObjectIdGetDatum(pub->oid));
     280         414 :     if (HeapTupleIsValid(cftuple))
     281             :     {
     282             :         Datum       cfdatum;
     283             :         bool        isnull;
     284             : 
     285             :         /* Lookup the column list attribute. */
     286         376 :         cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
     287             :                                   Anum_pg_publication_rel_prattrs, &isnull);
     288             : 
     289             :         /* Was a column list found? */
     290         376 :         if (!isnull)
     291             :         {
     292             :             /* Build the column list bitmap in the given memory context. */
     293          84 :             if (cols)
     294          78 :                 *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
     295             : 
     296          84 :             found = true;
     297             :         }
     298             : 
     299         376 :         ReleaseSysCache(cftuple);
     300             :     }
     301             : 
     302         414 :     return found;
     303             : }
     304             : 
     305             : /*
     306             :  * Gets the relations based on the publication partition option for a specified
     307             :  * relation.
     308             :  */
     309             : List *
     310        5428 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
     311             :                                Oid relid)
     312             : {
     313        5428 :     if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
     314             :         pub_partopt != PUBLICATION_PART_ROOT)
     315        1128 :     {
     316        1128 :         List       *all_parts = find_all_inheritors(relid, NoLock,
     317             :                                                     NULL);
     318             : 
     319        1128 :         if (pub_partopt == PUBLICATION_PART_ALL)
     320         934 :             result = list_concat(result, all_parts);
     321         194 :         else if (pub_partopt == PUBLICATION_PART_LEAF)
     322             :         {
     323             :             ListCell   *lc;
     324             : 
     325         710 :             foreach(lc, all_parts)
     326             :             {
     327         516 :                 Oid         partOid = lfirst_oid(lc);
     328             : 
     329         516 :                 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
     330         322 :                     result = lappend_oid(result, partOid);
     331             :             }
     332             :         }
     333             :         else
     334             :             Assert(false);
     335             :     }
     336             :     else
     337        4300 :         result = lappend_oid(result, relid);
     338             : 
     339        5428 :     return result;
     340             : }
     341             : 
     342             : /*
     343             :  * Returns the relid of the topmost ancestor that is published via this
     344             :  * publication if any and set its ancestor level to ancestor_level,
     345             :  * otherwise returns InvalidOid.
     346             :  *
     347             :  * The ancestor_level value allows us to compare the results for multiple
     348             :  * publications, and decide which value is higher up.
     349             :  *
     350             :  * Note that the list of ancestors should be ordered such that the topmost
     351             :  * ancestor is at the end of the list.
     352             :  */
     353             : Oid
     354         474 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
     355             : {
     356             :     ListCell   *lc;
     357         474 :     Oid         topmost_relid = InvalidOid;
     358         474 :     int         level = 0;
     359             : 
     360             :     /*
     361             :      * Find the "topmost" ancestor that is in this publication.
     362             :      */
     363         964 :     foreach(lc, ancestors)
     364             :     {
     365         490 :         Oid         ancestor = lfirst_oid(lc);
     366         490 :         List       *apubids = GetRelationPublications(ancestor);
     367         490 :         List       *aschemaPubids = NIL;
     368             : 
     369         490 :         level++;
     370             : 
     371         490 :         if (list_member_oid(apubids, puboid))
     372             :         {
     373         312 :             topmost_relid = ancestor;
     374             : 
     375         312 :             if (ancestor_level)
     376          86 :                 *ancestor_level = level;
     377             :         }
     378             :         else
     379             :         {
     380         178 :             aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
     381         178 :             if (list_member_oid(aschemaPubids, puboid))
     382             :             {
     383          10 :                 topmost_relid = ancestor;
     384             : 
     385          10 :                 if (ancestor_level)
     386          10 :                     *ancestor_level = level;
     387             :             }
     388             :         }
     389             : 
     390         490 :         list_free(apubids);
     391         490 :         list_free(aschemaPubids);
     392             :     }
     393             : 
     394         474 :     return topmost_relid;
     395             : }
     396             : 
     397             : /*
     398             :  * attnumstoint2vector
     399             :  *      Convert a Bitmapset of AttrNumbers into an int2vector.
     400             :  *
     401             :  * AttrNumber numbers are 0-based, i.e., not offset by
     402             :  * FirstLowInvalidHeapAttributeNumber.
     403             :  */
     404             : static int2vector *
     405         306 : attnumstoint2vector(Bitmapset *attrs)
     406             : {
     407             :     int2vector *result;
     408         306 :     int         n = bms_num_members(attrs);
     409         306 :     int         i = -1;
     410         306 :     int         j = 0;
     411             : 
     412         306 :     result = buildint2vector(NULL, n);
     413             : 
     414         840 :     while ((i = bms_next_member(attrs, i)) >= 0)
     415             :     {
     416             :         Assert(i <= PG_INT16_MAX);
     417             : 
     418         534 :         result->values[j++] = (int16) i;
     419             :     }
     420             : 
     421         306 :     return result;
     422             : }
     423             : 
     424             : /*
     425             :  * Insert new publication / relation mapping.
     426             :  */
     427             : ObjectAddress
     428        1046 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
     429             :                          bool if_not_exists)
     430             : {
     431             :     Relation    rel;
     432             :     HeapTuple   tup;
     433             :     Datum       values[Natts_pg_publication_rel];
     434             :     bool        nulls[Natts_pg_publication_rel];
     435        1046 :     Relation    targetrel = pri->relation;
     436        1046 :     Oid         relid = RelationGetRelid(targetrel);
     437             :     Oid         pubreloid;
     438             :     Bitmapset  *attnums;
     439        1046 :     Publication *pub = GetPublication(pubid);
     440             :     ObjectAddress myself,
     441             :                 referenced;
     442        1046 :     List       *relids = NIL;
     443             :     int         i;
     444             : 
     445        1046 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     446             : 
     447             :     /*
     448             :      * Check for duplicates. Note that this does not really prevent
     449             :      * duplicates, it's here just to provide nicer error message in common
     450             :      * case. The real protection is the unique key on the catalog.
     451             :      */
     452        1046 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     453             :                               ObjectIdGetDatum(pubid)))
     454             :     {
     455          22 :         table_close(rel, RowExclusiveLock);
     456             : 
     457          22 :         if (if_not_exists)
     458          16 :             return InvalidObjectAddress;
     459             : 
     460           6 :         ereport(ERROR,
     461             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     462             :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     463             :                         RelationGetRelationName(targetrel), pub->name)));
     464             :     }
     465             : 
     466        1024 :     check_publication_add_relation(targetrel);
     467             : 
     468             :     /* Validate and translate column names into a Bitmapset of attnums. */
     469         992 :     attnums = pub_collist_validate(pri->relation, pri->columns);
     470             : 
     471             :     /* Form a tuple. */
     472         974 :     memset(values, 0, sizeof(values));
     473         974 :     memset(nulls, false, sizeof(nulls));
     474             : 
     475         974 :     pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     476             :                                    Anum_pg_publication_rel_oid);
     477         974 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
     478         974 :     values[Anum_pg_publication_rel_prpubid - 1] =
     479         974 :         ObjectIdGetDatum(pubid);
     480         974 :     values[Anum_pg_publication_rel_prrelid - 1] =
     481         974 :         ObjectIdGetDatum(relid);
     482             : 
     483             :     /* Add qualifications, if available */
     484         974 :     if (pri->whereClause != NULL)
     485         298 :         values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
     486             :     else
     487         676 :         nulls[Anum_pg_publication_rel_prqual - 1] = true;
     488             : 
     489             :     /* Add column list, if available */
     490         974 :     if (pri->columns)
     491         306 :         values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
     492             :     else
     493         668 :         nulls[Anum_pg_publication_rel_prattrs - 1] = true;
     494             : 
     495         974 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     496             : 
     497             :     /* Insert tuple into catalog. */
     498         974 :     CatalogTupleInsert(rel, tup);
     499         974 :     heap_freetuple(tup);
     500             : 
     501             :     /* Register dependencies as needed */
     502         974 :     ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
     503             : 
     504             :     /* Add dependency on the publication */
     505         974 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     506         974 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     507             : 
     508             :     /* Add dependency on the relation */
     509         974 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     510         974 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     511             : 
     512             :     /* Add dependency on the objects mentioned in the qualifications */
     513         974 :     if (pri->whereClause)
     514         298 :         recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
     515             :                                         DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
     516             :                                         false);
     517             : 
     518             :     /* Add dependency on the columns, if any are listed */
     519         974 :     i = -1;
     520        1508 :     while ((i = bms_next_member(attnums, i)) >= 0)
     521             :     {
     522         534 :         ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
     523         534 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     524             :     }
     525             : 
     526             :     /* Close the table. */
     527         974 :     table_close(rel, RowExclusiveLock);
     528             : 
     529             :     /*
     530             :      * Invalidate relcache so that publication info is rebuilt.
     531             :      *
     532             :      * For the partitioned tables, we must invalidate all partitions contained
     533             :      * in the respective partition hierarchies, not just the one explicitly
     534             :      * mentioned in the publication. This is required because we implicitly
     535             :      * publish the child tables when the parent table is published.
     536             :      */
     537         974 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
     538             :                                             relid);
     539             : 
     540         974 :     InvalidatePublicationRels(relids);
     541             : 
     542         974 :     return myself;
     543             : }
     544             : 
     545             : /*
     546             :  * pub_collist_validate
     547             :  *      Process and validate the 'columns' list and ensure the columns are all
     548             :  *      valid to use for a publication.  Checks for and raises an ERROR for
     549             :  *      any unknown columns, system columns, or duplicate columns.
     550             :  *
     551             :  * Looks up each column's attnum and returns a 0-based Bitmapset of the
     552             :  * corresponding attnums.
     553             :  */
     554             : Bitmapset *
     555        1396 : pub_collist_validate(Relation targetrel, List *columns)
     556             : {
     557        1396 :     Bitmapset  *set = NULL;
     558             :     ListCell   *lc;
     559             : 
     560        2196 :     foreach(lc, columns)
     561             :     {
     562         830 :         char       *colname = strVal(lfirst(lc));
     563         830 :         AttrNumber  attnum = get_attnum(RelationGetRelid(targetrel), colname);
     564             : 
     565         830 :         if (attnum == InvalidAttrNumber)
     566           6 :             ereport(ERROR,
     567             :                     errcode(ERRCODE_UNDEFINED_COLUMN),
     568             :                     errmsg("column \"%s\" of relation \"%s\" does not exist",
     569             :                            colname, RelationGetRelationName(targetrel)));
     570             : 
     571         824 :         if (!AttrNumberIsForUserDefinedAttr(attnum))
     572          12 :             ereport(ERROR,
     573             :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     574             :                     errmsg("cannot use system column \"%s\" in publication column list",
     575             :                            colname));
     576             : 
     577         812 :         if (bms_is_member(attnum, set))
     578          12 :             ereport(ERROR,
     579             :                     errcode(ERRCODE_DUPLICATE_OBJECT),
     580             :                     errmsg("duplicate column \"%s\" in publication column list",
     581             :                            colname));
     582             : 
     583         800 :         set = bms_add_member(set, attnum);
     584             :     }
     585             : 
     586        1366 :     return set;
     587             : }
     588             : 
     589             : /*
     590             :  * Transform a column list (represented by an array Datum) to a bitmapset.
     591             :  *
     592             :  * If columns isn't NULL, add the column numbers to that set.
     593             :  *
     594             :  * If mcxt isn't NULL, build the bitmapset in that context.
     595             :  */
     596             : Bitmapset *
     597         444 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
     598             : {
     599         444 :     Bitmapset  *result = columns;
     600             :     ArrayType  *arr;
     601             :     int         nelems;
     602             :     int16      *elems;
     603         444 :     MemoryContext oldcxt = NULL;
     604             : 
     605         444 :     arr = DatumGetArrayTypeP(pubcols);
     606         444 :     nelems = ARR_DIMS(arr)[0];
     607         444 :     elems = (int16 *) ARR_DATA_PTR(arr);
     608             : 
     609             :     /* If a memory context was specified, switch to it. */
     610         444 :     if (mcxt)
     611          78 :         oldcxt = MemoryContextSwitchTo(mcxt);
     612             : 
     613        1226 :     for (int i = 0; i < nelems; i++)
     614         782 :         result = bms_add_member(result, elems[i]);
     615             : 
     616         444 :     if (mcxt)
     617          78 :         MemoryContextSwitchTo(oldcxt);
     618             : 
     619         444 :     return result;
     620             : }
     621             : 
     622             : /*
     623             :  * Returns a bitmap representing the columns of the specified table.
     624             :  *
     625             :  * Generated columns are included if include_gencols is true.
     626             :  */
     627             : Bitmapset *
     628          18 : pub_form_cols_map(Relation relation, bool include_gencols)
     629             : {
     630          18 :     Bitmapset  *result = NULL;
     631          18 :     TupleDesc   desc = RelationGetDescr(relation);
     632             : 
     633          58 :     for (int i = 0; i < desc->natts; i++)
     634             :     {
     635          40 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     636             : 
     637          40 :         if (att->attisdropped || (att->attgenerated && !include_gencols))
     638           4 :             continue;
     639             : 
     640          36 :         result = bms_add_member(result, att->attnum);
     641             :     }
     642             : 
     643          18 :     return result;
     644             : }
     645             : 
     646             : /*
     647             :  * Insert new publication / schema mapping.
     648             :  */
     649             : ObjectAddress
     650         222 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
     651             : {
     652             :     Relation    rel;
     653             :     HeapTuple   tup;
     654             :     Datum       values[Natts_pg_publication_namespace];
     655             :     bool        nulls[Natts_pg_publication_namespace];
     656             :     Oid         psschid;
     657         222 :     Publication *pub = GetPublication(pubid);
     658         222 :     List       *schemaRels = NIL;
     659             :     ObjectAddress myself,
     660             :                 referenced;
     661             : 
     662         222 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
     663             : 
     664             :     /*
     665             :      * Check for duplicates. Note that this does not really prevent
     666             :      * duplicates, it's here just to provide nicer error message in common
     667             :      * case. The real protection is the unique key on the catalog.
     668             :      */
     669         222 :     if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     670             :                               ObjectIdGetDatum(schemaid),
     671             :                               ObjectIdGetDatum(pubid)))
     672             :     {
     673          18 :         table_close(rel, RowExclusiveLock);
     674             : 
     675          18 :         if (if_not_exists)
     676          12 :             return InvalidObjectAddress;
     677             : 
     678           6 :         ereport(ERROR,
     679             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     680             :                  errmsg("schema \"%s\" is already member of publication \"%s\"",
     681             :                         get_namespace_name(schemaid), pub->name)));
     682             :     }
     683             : 
     684         204 :     check_publication_add_schema(schemaid);
     685             : 
     686             :     /* Form a tuple */
     687         198 :     memset(values, 0, sizeof(values));
     688         198 :     memset(nulls, false, sizeof(nulls));
     689             : 
     690         198 :     psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
     691             :                                  Anum_pg_publication_namespace_oid);
     692         198 :     values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
     693         198 :     values[Anum_pg_publication_namespace_pnpubid - 1] =
     694         198 :         ObjectIdGetDatum(pubid);
     695         198 :     values[Anum_pg_publication_namespace_pnnspid - 1] =
     696         198 :         ObjectIdGetDatum(schemaid);
     697             : 
     698         198 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     699             : 
     700             :     /* Insert tuple into catalog */
     701         198 :     CatalogTupleInsert(rel, tup);
     702         198 :     heap_freetuple(tup);
     703             : 
     704         198 :     ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
     705             : 
     706             :     /* Add dependency on the publication */
     707         198 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     708         198 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     709             : 
     710             :     /* Add dependency on the schema */
     711         198 :     ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
     712         198 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     713             : 
     714             :     /* Close the table */
     715         198 :     table_close(rel, RowExclusiveLock);
     716             : 
     717             :     /*
     718             :      * Invalidate relcache so that publication info is rebuilt. See
     719             :      * publication_add_relation for why we need to consider all the
     720             :      * partitions.
     721             :      */
     722         198 :     schemaRels = GetSchemaPublicationRelations(schemaid,
     723             :                                                PUBLICATION_PART_ALL);
     724         198 :     InvalidatePublicationRels(schemaRels);
     725             : 
     726         198 :     return myself;
     727             : }
     728             : 
     729             : /* Gets list of publication oids for a relation */
     730             : List *
     731       11476 : GetRelationPublications(Oid relid)
     732             : {
     733       11476 :     List       *result = NIL;
     734             :     CatCList   *pubrellist;
     735             :     int         i;
     736             : 
     737             :     /* Find all publications associated with the relation. */
     738       11476 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     739             :                                      ObjectIdGetDatum(relid));
     740       12970 :     for (i = 0; i < pubrellist->n_members; i++)
     741             :     {
     742        1494 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     743        1494 :         Oid         pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
     744             : 
     745        1494 :         result = lappend_oid(result, pubid);
     746             :     }
     747             : 
     748       11476 :     ReleaseSysCacheList(pubrellist);
     749             : 
     750       11476 :     return result;
     751             : }
     752             : 
     753             : /*
     754             :  * Gets list of relation oids for a publication.
     755             :  *
     756             :  * This should only be used FOR TABLE publications, the FOR ALL TABLES
     757             :  * should use GetAllTablesPublicationRelations().
     758             :  */
     759             : List *
     760        2128 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     761             : {
     762             :     List       *result;
     763             :     Relation    pubrelsrel;
     764             :     ScanKeyData scankey;
     765             :     SysScanDesc scan;
     766             :     HeapTuple   tup;
     767             : 
     768             :     /* Find all publications associated with the relation. */
     769        2128 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     770             : 
     771        2128 :     ScanKeyInit(&scankey,
     772             :                 Anum_pg_publication_rel_prpubid,
     773             :                 BTEqualStrategyNumber, F_OIDEQ,
     774             :                 ObjectIdGetDatum(pubid));
     775             : 
     776        2128 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
     777             :                               true, NULL, 1, &scankey);
     778             : 
     779        2128 :     result = NIL;
     780        5016 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     781             :     {
     782             :         Form_pg_publication_rel pubrel;
     783             : 
     784        2888 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     785        2888 :         result = GetPubPartitionOptionRelations(result, pub_partopt,
     786             :                                                 pubrel->prrelid);
     787             :     }
     788             : 
     789        2128 :     systable_endscan(scan);
     790        2128 :     table_close(pubrelsrel, AccessShareLock);
     791             : 
     792             :     /* Now sort and de-duplicate the result list */
     793        2128 :     list_sort(result, list_oid_cmp);
     794        2128 :     list_deduplicate_oid(result);
     795             : 
     796        2128 :     return result;
     797             : }
     798             : 
     799             : /*
     800             :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     801             :  */
     802             : List *
     803        7848 : GetAllTablesPublications(void)
     804             : {
     805             :     List       *result;
     806             :     Relation    rel;
     807             :     ScanKeyData scankey;
     808             :     SysScanDesc scan;
     809             :     HeapTuple   tup;
     810             : 
     811             :     /* Find all publications that are marked as for all tables. */
     812        7848 :     rel = table_open(PublicationRelationId, AccessShareLock);
     813             : 
     814        7848 :     ScanKeyInit(&scankey,
     815             :                 Anum_pg_publication_puballtables,
     816             :                 BTEqualStrategyNumber, F_BOOLEQ,
     817             :                 BoolGetDatum(true));
     818             : 
     819        7848 :     scan = systable_beginscan(rel, InvalidOid, false,
     820             :                               NULL, 1, &scankey);
     821             : 
     822        7848 :     result = NIL;
     823        7996 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     824             :     {
     825         148 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
     826             : 
     827         148 :         result = lappend_oid(result, oid);
     828             :     }
     829             : 
     830        7848 :     systable_endscan(scan);
     831        7848 :     table_close(rel, AccessShareLock);
     832             : 
     833        7848 :     return result;
     834             : }
     835             : 
     836             : /*
     837             :  * Gets list of all relation published by FOR ALL TABLES publication(s).
     838             :  *
     839             :  * If the publication publishes partition changes via their respective root
     840             :  * partitioned tables, we must exclude partitions in favor of including the
     841             :  * root partitioned tables.
     842             :  */
     843             : List *
     844         280 : GetAllTablesPublicationRelations(bool pubviaroot)
     845             : {
     846             :     Relation    classRel;
     847             :     ScanKeyData key[1];
     848             :     TableScanDesc scan;
     849             :     HeapTuple   tuple;
     850         280 :     List       *result = NIL;
     851             : 
     852         280 :     classRel = table_open(RelationRelationId, AccessShareLock);
     853             : 
     854         280 :     ScanKeyInit(&key[0],
     855             :                 Anum_pg_class_relkind,
     856             :                 BTEqualStrategyNumber, F_CHAREQ,
     857             :                 CharGetDatum(RELKIND_RELATION));
     858             : 
     859         280 :     scan = table_beginscan_catalog(classRel, 1, key);
     860             : 
     861       20746 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     862             :     {
     863       20466 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     864       20466 :         Oid         relid = relForm->oid;
     865             : 
     866       20466 :         if (is_publishable_class(relid, relForm) &&
     867        1414 :             !(relForm->relispartition && pubviaroot))
     868        1232 :             result = lappend_oid(result, relid);
     869             :     }
     870             : 
     871         280 :     table_endscan(scan);
     872             : 
     873         280 :     if (pubviaroot)
     874             :     {
     875          26 :         ScanKeyInit(&key[0],
     876             :                     Anum_pg_class_relkind,
     877             :                     BTEqualStrategyNumber, F_CHAREQ,
     878             :                     CharGetDatum(RELKIND_PARTITIONED_TABLE));
     879             : 
     880          26 :         scan = table_beginscan_catalog(classRel, 1, key);
     881             : 
     882         156 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     883             :         {
     884         130 :             Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     885         130 :             Oid         relid = relForm->oid;
     886             : 
     887         130 :             if (is_publishable_class(relid, relForm) &&
     888         130 :                 !relForm->relispartition)
     889         104 :                 result = lappend_oid(result, relid);
     890             :         }
     891             : 
     892          26 :         table_endscan(scan);
     893             :     }
     894             : 
     895         280 :     table_close(classRel, AccessShareLock);
     896         280 :     return result;
     897             : }
     898             : 
     899             : /*
     900             :  * Gets the list of schema oids for a publication.
     901             :  *
     902             :  * This should only be used FOR TABLES IN SCHEMA publications.
     903             :  */
     904             : List *
     905        2046 : GetPublicationSchemas(Oid pubid)
     906             : {
     907        2046 :     List       *result = NIL;
     908             :     Relation    pubschsrel;
     909             :     ScanKeyData scankey;
     910             :     SysScanDesc scan;
     911             :     HeapTuple   tup;
     912             : 
     913             :     /* Find all schemas associated with the publication */
     914        2046 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     915             : 
     916        2046 :     ScanKeyInit(&scankey,
     917             :                 Anum_pg_publication_namespace_pnpubid,
     918             :                 BTEqualStrategyNumber, F_OIDEQ,
     919             :                 ObjectIdGetDatum(pubid));
     920             : 
     921        2046 :     scan = systable_beginscan(pubschsrel,
     922             :                               PublicationNamespacePnnspidPnpubidIndexId,
     923             :                               true, NULL, 1, &scankey);
     924        2146 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     925             :     {
     926             :         Form_pg_publication_namespace pubsch;
     927             : 
     928         100 :         pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
     929             : 
     930         100 :         result = lappend_oid(result, pubsch->pnnspid);
     931             :     }
     932             : 
     933        2046 :     systable_endscan(scan);
     934        2046 :     table_close(pubschsrel, AccessShareLock);
     935             : 
     936        2046 :     return result;
     937             : }
     938             : 
     939             : /*
     940             :  * Gets the list of publication oids associated with a specified schema.
     941             :  */
     942             : List *
     943       11114 : GetSchemaPublications(Oid schemaid)
     944             : {
     945       11114 :     List       *result = NIL;
     946             :     CatCList   *pubschlist;
     947             :     int         i;
     948             : 
     949             :     /* Find all publications associated with the schema */
     950       11114 :     pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
     951             :                                      ObjectIdGetDatum(schemaid));
     952       11228 :     for (i = 0; i < pubschlist->n_members; i++)
     953             :     {
     954         114 :         HeapTuple   tup = &pubschlist->members[i]->tuple;
     955         114 :         Oid         pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
     956             : 
     957         114 :         result = lappend_oid(result, pubid);
     958             :     }
     959             : 
     960       11114 :     ReleaseSysCacheList(pubschlist);
     961             : 
     962       11114 :     return result;
     963             : }
     964             : 
     965             : /*
     966             :  * Get the list of publishable relation oids for a specified schema.
     967             :  */
     968             : List *
     969         460 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
     970             : {
     971             :     Relation    classRel;
     972             :     ScanKeyData key[1];
     973             :     TableScanDesc scan;
     974             :     HeapTuple   tuple;
     975         460 :     List       *result = NIL;
     976             : 
     977             :     Assert(OidIsValid(schemaid));
     978             : 
     979         460 :     classRel = table_open(RelationRelationId, AccessShareLock);
     980             : 
     981         460 :     ScanKeyInit(&key[0],
     982             :                 Anum_pg_class_relnamespace,
     983             :                 BTEqualStrategyNumber, F_OIDEQ,
     984             :                 schemaid);
     985             : 
     986             :     /* get all the relations present in the specified schema */
     987         460 :     scan = table_beginscan_catalog(classRel, 1, key);
     988       23948 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     989             :     {
     990       23488 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     991       23488 :         Oid         relid = relForm->oid;
     992             :         char        relkind;
     993             : 
     994       23488 :         if (!is_publishable_class(relid, relForm))
     995        9620 :             continue;
     996             : 
     997       13868 :         relkind = get_rel_relkind(relid);
     998       13868 :         if (relkind == RELKIND_RELATION)
     999       13134 :             result = lappend_oid(result, relid);
    1000         734 :         else if (relkind == RELKIND_PARTITIONED_TABLE)
    1001             :         {
    1002         734 :             List       *partitionrels = NIL;
    1003             : 
    1004             :             /*
    1005             :              * It is quite possible that some of the partitions are in a
    1006             :              * different schema than the parent table, so we need to get such
    1007             :              * partitions separately.
    1008             :              */
    1009         734 :             partitionrels = GetPubPartitionOptionRelations(partitionrels,
    1010             :                                                            pub_partopt,
    1011             :                                                            relForm->oid);
    1012         734 :             result = list_concat_unique_oid(result, partitionrels);
    1013             :         }
    1014             :     }
    1015             : 
    1016         460 :     table_endscan(scan);
    1017         460 :     table_close(classRel, AccessShareLock);
    1018         460 :     return result;
    1019             : }
    1020             : 
    1021             : /*
    1022             :  * Gets the list of all relations published by FOR TABLES IN SCHEMA
    1023             :  * publication.
    1024             :  */
    1025             : List *
    1026        1642 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
    1027             : {
    1028        1642 :     List       *result = NIL;
    1029        1642 :     List       *pubschemalist = GetPublicationSchemas(pubid);
    1030             :     ListCell   *cell;
    1031             : 
    1032        1712 :     foreach(cell, pubschemalist)
    1033             :     {
    1034          70 :         Oid         schemaid = lfirst_oid(cell);
    1035          70 :         List       *schemaRels = NIL;
    1036             : 
    1037          70 :         schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
    1038          70 :         result = list_concat(result, schemaRels);
    1039             :     }
    1040             : 
    1041        1642 :     return result;
    1042             : }
    1043             : 
    1044             : /*
    1045             :  * Get publication using oid
    1046             :  *
    1047             :  * The Publication struct and its data are palloc'ed here.
    1048             :  */
    1049             : Publication *
    1050        7284 : GetPublication(Oid pubid)
    1051             : {
    1052             :     HeapTuple   tup;
    1053             :     Publication *pub;
    1054             :     Form_pg_publication pubform;
    1055             : 
    1056        7284 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1057        7284 :     if (!HeapTupleIsValid(tup))
    1058           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1059             : 
    1060        7284 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1061             : 
    1062        7284 :     pub = (Publication *) palloc(sizeof(Publication));
    1063        7284 :     pub->oid = pubid;
    1064        7284 :     pub->name = pstrdup(NameStr(pubform->pubname));
    1065        7284 :     pub->alltables = pubform->puballtables;
    1066        7284 :     pub->pubactions.pubinsert = pubform->pubinsert;
    1067        7284 :     pub->pubactions.pubupdate = pubform->pubupdate;
    1068        7284 :     pub->pubactions.pubdelete = pubform->pubdelete;
    1069        7284 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
    1070        7284 :     pub->pubviaroot = pubform->pubviaroot;
    1071        7284 :     pub->pubgencols = pubform->pubgencols;
    1072             : 
    1073        7284 :     ReleaseSysCache(tup);
    1074             : 
    1075        7284 :     return pub;
    1076             : }
    1077             : 
    1078             : /*
    1079             :  * Get Publication using name.
    1080             :  */
    1081             : Publication *
    1082        2242 : GetPublicationByName(const char *pubname, bool missing_ok)
    1083             : {
    1084             :     Oid         oid;
    1085             : 
    1086        2242 :     oid = get_publication_oid(pubname, missing_ok);
    1087             : 
    1088        2240 :     return OidIsValid(oid) ? GetPublication(oid) : NULL;
    1089             : }
    1090             : 
    1091             : /*
    1092             :  * Get information of the tables in the given publication array.
    1093             :  *
    1094             :  * Returns pubid, relid, column list, row filter for each table.
    1095             :  */
    1096             : Datum
    1097        5516 : pg_get_publication_tables(PG_FUNCTION_ARGS)
    1098             : {
    1099             : #define NUM_PUBLICATION_TABLES_ELEM 4
    1100             :     FuncCallContext *funcctx;
    1101        5516 :     List       *table_infos = NIL;
    1102             : 
    1103             :     /* stuff done only on the first call of the function */
    1104        5516 :     if (SRF_IS_FIRSTCALL())
    1105             :     {
    1106             :         TupleDesc   tupdesc;
    1107             :         MemoryContext oldcontext;
    1108             :         ArrayType  *arr;
    1109             :         Datum      *elems;
    1110             :         int         nelems,
    1111             :                     i;
    1112        1740 :         bool        viaroot = false;
    1113             : 
    1114             :         /* create a function context for cross-call persistence */
    1115        1740 :         funcctx = SRF_FIRSTCALL_INIT();
    1116             : 
    1117             :         /* switch to memory context appropriate for multiple function calls */
    1118        1740 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1119             : 
    1120             :         /*
    1121             :          * Deconstruct the parameter into elements where each element is a
    1122             :          * publication name.
    1123             :          */
    1124        1740 :         arr = PG_GETARG_ARRAYTYPE_P(0);
    1125        1740 :         deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
    1126             : 
    1127             :         /* Get Oids of tables from each publication. */
    1128        3566 :         for (i = 0; i < nelems; i++)
    1129             :         {
    1130             :             Publication *pub_elem;
    1131        1826 :             List       *pub_elem_tables = NIL;
    1132             :             ListCell   *lc;
    1133             : 
    1134        1826 :             pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
    1135             : 
    1136             :             /*
    1137             :              * Publications support partitioned tables. If
    1138             :              * publish_via_partition_root is false, all changes are replicated
    1139             :              * using leaf partition identity and schema, so we only need
    1140             :              * those. Otherwise, get the partitioned table itself.
    1141             :              */
    1142        1826 :             if (pub_elem->alltables)
    1143         280 :                 pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
    1144             :             else
    1145             :             {
    1146             :                 List       *relids,
    1147             :                            *schemarelids;
    1148             : 
    1149        1546 :                 relids = GetPublicationRelations(pub_elem->oid,
    1150        1546 :                                                  pub_elem->pubviaroot ?
    1151        1546 :                                                  PUBLICATION_PART_ROOT :
    1152             :                                                  PUBLICATION_PART_LEAF);
    1153        1546 :                 schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
    1154        1546 :                                                                 pub_elem->pubviaroot ?
    1155        1546 :                                                                 PUBLICATION_PART_ROOT :
    1156             :                                                                 PUBLICATION_PART_LEAF);
    1157        1546 :                 pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
    1158             :             }
    1159             : 
    1160             :             /*
    1161             :              * Record the published table and the corresponding publication so
    1162             :              * that we can get row filters and column lists later.
    1163             :              *
    1164             :              * When a table is published by multiple publications, to obtain
    1165             :              * all row filters and column lists, the structure related to this
    1166             :              * table will be recorded multiple times.
    1167             :              */
    1168        5682 :             foreach(lc, pub_elem_tables)
    1169             :             {
    1170        3856 :                 published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
    1171             : 
    1172        3856 :                 table_info->relid = lfirst_oid(lc);
    1173        3856 :                 table_info->pubid = pub_elem->oid;
    1174        3856 :                 table_infos = lappend(table_infos, table_info);
    1175             :             }
    1176             : 
    1177             :             /* At least one publication is using publish_via_partition_root. */
    1178        1826 :             if (pub_elem->pubviaroot)
    1179         352 :                 viaroot = true;
    1180             :         }
    1181             : 
    1182             :         /*
    1183             :          * If the publication publishes partition changes via their respective
    1184             :          * root partitioned tables, we must exclude partitions in favor of
    1185             :          * including the root partitioned tables. Otherwise, the function
    1186             :          * could return both the child and parent tables which could cause
    1187             :          * data of the child table to be double-published on the subscriber
    1188             :          * side.
    1189             :          */
    1190        1740 :         if (viaroot)
    1191         336 :             filter_partitions(table_infos);
    1192             : 
    1193             :         /* Construct a tuple descriptor for the result rows. */
    1194        1740 :         tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
    1195        1740 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
    1196             :                            OIDOID, -1, 0);
    1197        1740 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
    1198             :                            OIDOID, -1, 0);
    1199        1740 :         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
    1200             :                            INT2VECTOROID, -1, 0);
    1201        1740 :         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
    1202             :                            PG_NODE_TREEOID, -1, 0);
    1203             : 
    1204        1740 :         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
    1205        1740 :         funcctx->user_fctx = (void *) table_infos;
    1206             : 
    1207        1740 :         MemoryContextSwitchTo(oldcontext);
    1208             :     }
    1209             : 
    1210             :     /* stuff done on every call of the function */
    1211        5516 :     funcctx = SRF_PERCALL_SETUP();
    1212        5516 :     table_infos = (List *) funcctx->user_fctx;
    1213             : 
    1214        5516 :     if (funcctx->call_cntr < list_length(table_infos))
    1215             :     {
    1216        3776 :         HeapTuple   pubtuple = NULL;
    1217             :         HeapTuple   rettuple;
    1218             :         Publication *pub;
    1219        3776 :         published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
    1220        3776 :         Oid         relid = table_info->relid;
    1221        3776 :         Oid         schemaid = get_rel_namespace(relid);
    1222        3776 :         Datum       values[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1223        3776 :         bool        nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1224             : 
    1225             :         /*
    1226             :          * Form tuple with appropriate data.
    1227             :          */
    1228             : 
    1229        3776 :         pub = GetPublication(table_info->pubid);
    1230             : 
    1231        3776 :         values[0] = ObjectIdGetDatum(pub->oid);
    1232        3776 :         values[1] = ObjectIdGetDatum(relid);
    1233             : 
    1234             :         /*
    1235             :          * We don't consider row filters or column lists for FOR ALL TABLES or
    1236             :          * FOR TABLES IN SCHEMA publications.
    1237             :          */
    1238        3776 :         if (!pub->alltables &&
    1239        2440 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
    1240             :                                    ObjectIdGetDatum(schemaid),
    1241             :                                    ObjectIdGetDatum(pub->oid)))
    1242        2346 :             pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
    1243             :                                            ObjectIdGetDatum(relid),
    1244             :                                            ObjectIdGetDatum(pub->oid));
    1245             : 
    1246        3776 :         if (HeapTupleIsValid(pubtuple))
    1247             :         {
    1248             :             /* Lookup the column list attribute. */
    1249        2128 :             values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1250             :                                         Anum_pg_publication_rel_prattrs,
    1251             :                                         &(nulls[2]));
    1252             : 
    1253             :             /* Null indicates no filter. */
    1254        2128 :             values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1255             :                                         Anum_pg_publication_rel_prqual,
    1256             :                                         &(nulls[3]));
    1257             :         }
    1258             :         else
    1259             :         {
    1260        1648 :             nulls[2] = true;
    1261        1648 :             nulls[3] = true;
    1262             :         }
    1263             : 
    1264             :         /* Show all columns when the column list is not specified. */
    1265        3776 :         if (nulls[2])
    1266             :         {
    1267        3530 :             Relation    rel = table_open(relid, AccessShareLock);
    1268        3530 :             int         nattnums = 0;
    1269             :             int16      *attnums;
    1270        3530 :             TupleDesc   desc = RelationGetDescr(rel);
    1271             :             int         i;
    1272             : 
    1273        3530 :             attnums = (int16 *) palloc(desc->natts * sizeof(int16));
    1274             : 
    1275        9452 :             for (i = 0; i < desc->natts; i++)
    1276             :             {
    1277        5922 :                 Form_pg_attribute att = TupleDescAttr(desc, i);
    1278             : 
    1279        5922 :                 if (att->attisdropped || (att->attgenerated && !pub->pubgencols))
    1280          40 :                     continue;
    1281             : 
    1282        5882 :                 attnums[nattnums++] = att->attnum;
    1283             :             }
    1284             : 
    1285        3530 :             if (nattnums > 0)
    1286             :             {
    1287        3486 :                 values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
    1288        3486 :                 nulls[2] = false;
    1289             :             }
    1290             : 
    1291        3530 :             table_close(rel, AccessShareLock);
    1292             :         }
    1293             : 
    1294        3776 :         rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
    1295             : 
    1296        3776 :         SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
    1297             :     }
    1298             : 
    1299        1740 :     SRF_RETURN_DONE(funcctx);
    1300             : }

Generated by: LCOV version 1.14