LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 440 460 95.7 %
Date: 2025-10-10 16:17:58 Functions: 28 29 96.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_publication.c
       4             :  *      publication C API manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_publication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "catalog/catalog.h"
      22             : #include "catalog/dependency.h"
      23             : #include "catalog/indexing.h"
      24             : #include "catalog/namespace.h"
      25             : #include "catalog/objectaddress.h"
      26             : #include "catalog/partition.h"
      27             : #include "catalog/pg_inherits.h"
      28             : #include "catalog/pg_namespace.h"
      29             : #include "catalog/pg_publication.h"
      30             : #include "catalog/pg_publication_namespace.h"
      31             : #include "catalog/pg_publication_rel.h"
      32             : #include "catalog/pg_type.h"
      33             : #include "commands/publicationcmds.h"
      34             : #include "funcapi.h"
      35             : #include "utils/array.h"
      36             : #include "utils/builtins.h"
      37             : #include "utils/catcache.h"
      38             : #include "utils/fmgroids.h"
      39             : #include "utils/lsyscache.h"
      40             : #include "utils/rel.h"
      41             : #include "utils/syscache.h"
      42             : 
      43             : /* Records association between publication and published table */
      44             : typedef struct
      45             : {
      46             :     Oid         relid;          /* OID of published table */
      47             :     Oid         pubid;          /* OID of publication that publishes this
      48             :                                  * table. */
      49             : } published_rel;
      50             : 
      51             : /*
      52             :  * Check if relation can be in given publication and throws appropriate
      53             :  * error if not.
      54             :  */
      55             : static void
      56        1128 : check_publication_add_relation(Relation targetrel)
      57             : {
      58             :     /* Must be a regular or partitioned table */
      59        1128 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
      60         144 :         RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
      61          14 :         ereport(ERROR,
      62             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      63             :                  errmsg("cannot add relation \"%s\" to publication",
      64             :                         RelationGetRelationName(targetrel)),
      65             :                  errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
      66             : 
      67             :     /* Can't be system table */
      68        1114 :     if (IsCatalogRelation(targetrel))
      69           6 :         ereport(ERROR,
      70             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      71             :                  errmsg("cannot add relation \"%s\" to publication",
      72             :                         RelationGetRelationName(targetrel)),
      73             :                  errdetail("This operation is not supported for system tables.")));
      74             : 
      75             :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      76        1108 :     if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
      77           6 :         ereport(ERROR,
      78             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      79             :                  errmsg("cannot add relation \"%s\" to publication",
      80             :                         RelationGetRelationName(targetrel)),
      81             :                  errdetail("This operation is not supported for temporary tables.")));
      82        1102 :     else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
      83           6 :         ereport(ERROR,
      84             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      85             :                  errmsg("cannot add relation \"%s\" to publication",
      86             :                         RelationGetRelationName(targetrel)),
      87             :                  errdetail("This operation is not supported for unlogged tables.")));
      88        1096 : }
      89             : 
      90             : /*
      91             :  * Check if schema can be in given publication and throw appropriate error if
      92             :  * not.
      93             :  */
      94             : static void
      95         244 : check_publication_add_schema(Oid schemaid)
      96             : {
      97             :     /* Can't be system namespace */
      98         244 :     if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
      99           6 :         ereport(ERROR,
     100             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     101             :                  errmsg("cannot add schema \"%s\" to publication",
     102             :                         get_namespace_name(schemaid)),
     103             :                  errdetail("This operation is not supported for system schemas.")));
     104             : 
     105             :     /* Can't be temporary namespace */
     106         238 :     if (isAnyTempNamespace(schemaid))
     107           0 :         ereport(ERROR,
     108             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     109             :                  errmsg("cannot add schema \"%s\" to publication",
     110             :                         get_namespace_name(schemaid)),
     111             :                  errdetail("Temporary schemas cannot be replicated.")));
     112         238 : }
     113             : 
     114             : /*
     115             :  * Returns if relation represented by oid and Form_pg_class entry
     116             :  * is publishable.
     117             :  *
     118             :  * Does same checks as check_publication_add_relation() above except for
     119             :  * RELKIND_SEQUENCE, but does not need relation to be opened and also does
     120             :  * not throw errors. Here, the additional check is to support ALL SEQUENCES
     121             :  * publication.
     122             :  *
     123             :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
     124             :  * ie all tables created during initdb.  This mainly affects the preinstalled
     125             :  * information_schema.  IsCatalogRelationOid() only excludes tables with
     126             :  * relid < FirstUnpinnedObjectId, making that test rather redundant,
     127             :  * but really we should get rid of the FirstNormalObjectId test not
     128             :  * IsCatalogRelationOid.  We can't do so today because we don't want
     129             :  * information_schema tables to be considered publishable; but this test
     130             :  * is really inadequate for that, since the information_schema could be
     131             :  * dropped and reloaded and then it'll be considered publishable.  The best
     132             :  * long-term solution may be to add a "relispublishable" bool to pg_class,
     133             :  * and depend on that instead of OID checks.
     134             :  */
     135             : static bool
     136      599306 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     137             : {
     138      611594 :     return (reltuple->relkind == RELKIND_RELATION ||
     139       12288 :             reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
     140       10784 :             reltuple->relkind == RELKIND_SEQUENCE) &&
     141      590554 :         !IsCatalogRelationOid(relid) &&
     142     1198612 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     143             :         relid >= FirstNormalObjectId;
     144             : }
     145             : 
     146             : /*
     147             :  * Another variant of is_publishable_class(), taking a Relation.
     148             :  */
     149             : bool
     150      542690 : is_publishable_relation(Relation rel)
     151             : {
     152      542690 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     153             : }
     154             : 
     155             : /*
     156             :  * SQL-callable variant of the above
     157             :  *
     158             :  * This returns null when the relation does not exist.  This is intended to be
     159             :  * used for example in psql to avoid gratuitous errors when there are
     160             :  * concurrent catalog changes.
     161             :  */
     162             : Datum
     163        6260 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     164             : {
     165        6260 :     Oid         relid = PG_GETARG_OID(0);
     166             :     HeapTuple   tuple;
     167             :     bool        result;
     168             : 
     169        6260 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     170        6260 :     if (!HeapTupleIsValid(tuple))
     171           0 :         PG_RETURN_NULL();
     172        6260 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     173        6260 :     ReleaseSysCache(tuple);
     174        6260 :     PG_RETURN_BOOL(result);
     175             : }
     176             : 
     177             : /*
     178             :  * Returns true if the ancestor is in the list of published relations.
     179             :  * Otherwise, returns false.
     180             :  */
     181             : static bool
     182         202 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
     183             : {
     184             :     ListCell   *lc;
     185             : 
     186         694 :     foreach(lc, table_infos)
     187             :     {
     188         572 :         Oid         relid = ((published_rel *) lfirst(lc))->relid;
     189             : 
     190         572 :         if (relid == ancestor)
     191          80 :             return true;
     192             :     }
     193             : 
     194         122 :     return false;
     195             : }
     196             : 
     197             : /*
     198             :  * Filter out the partitions whose parent tables are also present in the list.
     199             :  */
     200             : static void
     201         352 : filter_partitions(List *table_infos)
     202             : {
     203             :     ListCell   *lc;
     204             : 
     205        1034 :     foreach(lc, table_infos)
     206             :     {
     207         682 :         bool        skip = false;
     208         682 :         List       *ancestors = NIL;
     209             :         ListCell   *lc2;
     210         682 :         published_rel *table_info = (published_rel *) lfirst(lc);
     211             : 
     212         682 :         if (get_rel_relispartition(table_info->relid))
     213         202 :             ancestors = get_partition_ancestors(table_info->relid);
     214             : 
     215         804 :         foreach(lc2, ancestors)
     216             :         {
     217         202 :             Oid         ancestor = lfirst_oid(lc2);
     218             : 
     219         202 :             if (is_ancestor_member_tableinfos(ancestor, table_infos))
     220             :             {
     221          80 :                 skip = true;
     222          80 :                 break;
     223             :             }
     224             :         }
     225             : 
     226         682 :         if (skip)
     227          80 :             table_infos = foreach_delete_current(table_infos, lc);
     228             :     }
     229         352 : }
     230             : 
     231             : /*
     232             :  * Returns true if any schema is associated with the publication, false if no
     233             :  * schema is associated with the publication.
     234             :  */
     235             : bool
     236         284 : is_schema_publication(Oid pubid)
     237             : {
     238             :     Relation    pubschsrel;
     239             :     ScanKeyData scankey;
     240             :     SysScanDesc scan;
     241             :     HeapTuple   tup;
     242         284 :     bool        result = false;
     243             : 
     244         284 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     245         284 :     ScanKeyInit(&scankey,
     246             :                 Anum_pg_publication_namespace_pnpubid,
     247             :                 BTEqualStrategyNumber, F_OIDEQ,
     248             :                 ObjectIdGetDatum(pubid));
     249             : 
     250         284 :     scan = systable_beginscan(pubschsrel,
     251             :                               PublicationNamespacePnnspidPnpubidIndexId,
     252             :                               true, NULL, 1, &scankey);
     253         284 :     tup = systable_getnext(scan);
     254         284 :     result = HeapTupleIsValid(tup);
     255             : 
     256         284 :     systable_endscan(scan);
     257         284 :     table_close(pubschsrel, AccessShareLock);
     258             : 
     259         284 :     return result;
     260             : }
     261             : 
     262             : /*
     263             :  * Returns true if the relation has column list associated with the
     264             :  * publication, false otherwise.
     265             :  *
     266             :  * If a column list is found, the corresponding bitmap is returned through the
     267             :  * cols parameter, if provided. The bitmap is constructed within the given
     268             :  * memory context (mcxt).
     269             :  */
     270             : bool
     271        1586 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
     272             :                             Bitmapset **cols)
     273             : {
     274             :     HeapTuple   cftuple;
     275        1586 :     bool        found = false;
     276             : 
     277        1586 :     if (pub->alltables)
     278         386 :         return false;
     279             : 
     280        1200 :     cftuple = SearchSysCache2(PUBLICATIONRELMAP,
     281             :                               ObjectIdGetDatum(relid),
     282             :                               ObjectIdGetDatum(pub->oid));
     283        1200 :     if (HeapTupleIsValid(cftuple))
     284             :     {
     285             :         Datum       cfdatum;
     286             :         bool        isnull;
     287             : 
     288             :         /* Lookup the column list attribute. */
     289        1098 :         cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
     290             :                                   Anum_pg_publication_rel_prattrs, &isnull);
     291             : 
     292             :         /* Was a column list found? */
     293        1098 :         if (!isnull)
     294             :         {
     295             :             /* Build the column list bitmap in the given memory context. */
     296         314 :             if (cols)
     297         308 :                 *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
     298             : 
     299         314 :             found = true;
     300             :         }
     301             : 
     302        1098 :         ReleaseSysCache(cftuple);
     303             :     }
     304             : 
     305        1200 :     return found;
     306             : }
     307             : 
     308             : /*
     309             :  * Gets the relations based on the publication partition option for a specified
     310             :  * relation.
     311             :  */
     312             : List *
     313        5830 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
     314             :                                Oid relid)
     315             : {
     316        5830 :     if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
     317             :         pub_partopt != PUBLICATION_PART_ROOT)
     318        1188 :     {
     319        1188 :         List       *all_parts = find_all_inheritors(relid, NoLock,
     320             :                                                     NULL);
     321             : 
     322        1188 :         if (pub_partopt == PUBLICATION_PART_ALL)
     323         986 :             result = list_concat(result, all_parts);
     324         202 :         else if (pub_partopt == PUBLICATION_PART_LEAF)
     325             :         {
     326             :             ListCell   *lc;
     327             : 
     328         738 :             foreach(lc, all_parts)
     329             :             {
     330         536 :                 Oid         partOid = lfirst_oid(lc);
     331             : 
     332         536 :                 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
     333         334 :                     result = lappend_oid(result, partOid);
     334             :             }
     335             :         }
     336             :         else
     337             :             Assert(false);
     338             :     }
     339             :     else
     340        4642 :         result = lappend_oid(result, relid);
     341             : 
     342        5830 :     return result;
     343             : }
     344             : 
     345             : /*
     346             :  * Returns the relid of the topmost ancestor that is published via this
     347             :  * publication if any and set its ancestor level to ancestor_level,
     348             :  * otherwise returns InvalidOid.
     349             :  *
     350             :  * The ancestor_level value allows us to compare the results for multiple
     351             :  * publications, and decide which value is higher up.
     352             :  *
     353             :  * Note that the list of ancestors should be ordered such that the topmost
     354             :  * ancestor is at the end of the list.
     355             :  */
     356             : Oid
     357         510 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
     358             : {
     359             :     ListCell   *lc;
     360         510 :     Oid         topmost_relid = InvalidOid;
     361         510 :     int         level = 0;
     362             : 
     363             :     /*
     364             :      * Find the "topmost" ancestor that is in this publication.
     365             :      */
     366        1036 :     foreach(lc, ancestors)
     367             :     {
     368         526 :         Oid         ancestor = lfirst_oid(lc);
     369         526 :         List       *apubids = GetRelationPublications(ancestor);
     370         526 :         List       *aschemaPubids = NIL;
     371             : 
     372         526 :         level++;
     373             : 
     374         526 :         if (list_member_oid(apubids, puboid))
     375             :         {
     376         312 :             topmost_relid = ancestor;
     377             : 
     378         312 :             if (ancestor_level)
     379          86 :                 *ancestor_level = level;
     380             :         }
     381             :         else
     382             :         {
     383         214 :             aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
     384         214 :             if (list_member_oid(aschemaPubids, puboid))
     385             :             {
     386          10 :                 topmost_relid = ancestor;
     387             : 
     388          10 :                 if (ancestor_level)
     389          10 :                     *ancestor_level = level;
     390             :             }
     391             :         }
     392             : 
     393         526 :         list_free(apubids);
     394         526 :         list_free(aschemaPubids);
     395             :     }
     396             : 
     397         510 :     return topmost_relid;
     398             : }
     399             : 
     400             : /*
     401             :  * attnumstoint2vector
     402             :  *      Convert a Bitmapset of AttrNumbers into an int2vector.
     403             :  *
     404             :  * AttrNumber numbers are 0-based, i.e., not offset by
     405             :  * FirstLowInvalidHeapAttributeNumber.
     406             :  */
     407             : static int2vector *
     408         332 : attnumstoint2vector(Bitmapset *attrs)
     409             : {
     410             :     int2vector *result;
     411         332 :     int         n = bms_num_members(attrs);
     412         332 :     int         i = -1;
     413         332 :     int         j = 0;
     414             : 
     415         332 :     result = buildint2vector(NULL, n);
     416             : 
     417         904 :     while ((i = bms_next_member(attrs, i)) >= 0)
     418             :     {
     419             :         Assert(i <= PG_INT16_MAX);
     420             : 
     421         572 :         result->values[j++] = (int16) i;
     422             :     }
     423             : 
     424         332 :     return result;
     425             : }
     426             : 
     427             : /*
     428             :  * Insert new publication / relation mapping.
     429             :  */
     430             : ObjectAddress
     431        1150 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
     432             :                          bool if_not_exists)
     433             : {
     434             :     Relation    rel;
     435             :     HeapTuple   tup;
     436             :     Datum       values[Natts_pg_publication_rel];
     437             :     bool        nulls[Natts_pg_publication_rel];
     438        1150 :     Relation    targetrel = pri->relation;
     439        1150 :     Oid         relid = RelationGetRelid(targetrel);
     440             :     Oid         pubreloid;
     441             :     Bitmapset  *attnums;
     442        1150 :     Publication *pub = GetPublication(pubid);
     443             :     ObjectAddress myself,
     444             :                 referenced;
     445        1150 :     List       *relids = NIL;
     446             :     int         i;
     447             : 
     448        1150 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     449             : 
     450             :     /*
     451             :      * Check for duplicates. Note that this does not really prevent
     452             :      * duplicates, it's here just to provide nicer error message in common
     453             :      * case. The real protection is the unique key on the catalog.
     454             :      */
     455        1150 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     456             :                               ObjectIdGetDatum(pubid)))
     457             :     {
     458          22 :         table_close(rel, RowExclusiveLock);
     459             : 
     460          22 :         if (if_not_exists)
     461          16 :             return InvalidObjectAddress;
     462             : 
     463           6 :         ereport(ERROR,
     464             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     465             :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     466             :                         RelationGetRelationName(targetrel), pub->name)));
     467             :     }
     468             : 
     469        1128 :     check_publication_add_relation(targetrel);
     470             : 
     471             :     /* Validate and translate column names into a Bitmapset of attnums. */
     472        1096 :     attnums = pub_collist_validate(pri->relation, pri->columns);
     473             : 
     474             :     /* Form a tuple. */
     475        1072 :     memset(values, 0, sizeof(values));
     476        1072 :     memset(nulls, false, sizeof(nulls));
     477             : 
     478        1072 :     pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     479             :                                    Anum_pg_publication_rel_oid);
     480        1072 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
     481        1072 :     values[Anum_pg_publication_rel_prpubid - 1] =
     482        1072 :         ObjectIdGetDatum(pubid);
     483        1072 :     values[Anum_pg_publication_rel_prrelid - 1] =
     484        1072 :         ObjectIdGetDatum(relid);
     485             : 
     486             :     /* Add qualifications, if available */
     487        1072 :     if (pri->whereClause != NULL)
     488         330 :         values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
     489             :     else
     490         742 :         nulls[Anum_pg_publication_rel_prqual - 1] = true;
     491             : 
     492             :     /* Add column list, if available */
     493        1072 :     if (pri->columns)
     494         332 :         values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
     495             :     else
     496         740 :         nulls[Anum_pg_publication_rel_prattrs - 1] = true;
     497             : 
     498        1072 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     499             : 
     500             :     /* Insert tuple into catalog. */
     501        1072 :     CatalogTupleInsert(rel, tup);
     502        1072 :     heap_freetuple(tup);
     503             : 
     504             :     /* Register dependencies as needed */
     505        1072 :     ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
     506             : 
     507             :     /* Add dependency on the publication */
     508        1072 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     509        1072 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     510             : 
     511             :     /* Add dependency on the relation */
     512        1072 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     513        1072 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     514             : 
     515             :     /* Add dependency on the objects mentioned in the qualifications */
     516        1072 :     if (pri->whereClause)
     517         330 :         recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
     518             :                                         DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
     519             :                                         false);
     520             : 
     521             :     /* Add dependency on the columns, if any are listed */
     522        1072 :     i = -1;
     523        1644 :     while ((i = bms_next_member(attnums, i)) >= 0)
     524             :     {
     525         572 :         ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
     526         572 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     527             :     }
     528             : 
     529             :     /* Close the table. */
     530        1072 :     table_close(rel, RowExclusiveLock);
     531             : 
     532             :     /*
     533             :      * Invalidate relcache so that publication info is rebuilt.
     534             :      *
     535             :      * For the partitioned tables, we must invalidate all partitions contained
     536             :      * in the respective partition hierarchies, not just the one explicitly
     537             :      * mentioned in the publication. This is required because we implicitly
     538             :      * publish the child tables when the parent table is published.
     539             :      */
     540        1072 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
     541             :                                             relid);
     542             : 
     543        1072 :     InvalidatePublicationRels(relids);
     544             : 
     545        1072 :     return myself;
     546             : }
     547             : 
     548             : /*
     549             :  * pub_collist_validate
     550             :  *      Process and validate the 'columns' list and ensure the columns are all
     551             :  *      valid to use for a publication.  Checks for and raises an ERROR for
     552             :  *      any unknown columns, system columns, duplicate columns, or virtual
     553             :  *      generated columns.
     554             :  *
     555             :  * Looks up each column's attnum and returns a 0-based Bitmapset of the
     556             :  * corresponding attnums.
     557             :  */
     558             : Bitmapset *
     559        1500 : pub_collist_validate(Relation targetrel, List *columns)
     560             : {
     561        1500 :     Bitmapset  *set = NULL;
     562             :     ListCell   *lc;
     563        1500 :     TupleDesc   tupdesc = RelationGetDescr(targetrel);
     564             : 
     565        2344 :     foreach(lc, columns)
     566             :     {
     567         880 :         char       *colname = strVal(lfirst(lc));
     568         880 :         AttrNumber  attnum = get_attnum(RelationGetRelid(targetrel), colname);
     569             : 
     570         880 :         if (attnum == InvalidAttrNumber)
     571           6 :             ereport(ERROR,
     572             :                     errcode(ERRCODE_UNDEFINED_COLUMN),
     573             :                     errmsg("column \"%s\" of relation \"%s\" does not exist",
     574             :                            colname, RelationGetRelationName(targetrel)));
     575             : 
     576         874 :         if (!AttrNumberIsForUserDefinedAttr(attnum))
     577          12 :             ereport(ERROR,
     578             :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     579             :                     errmsg("cannot use system column \"%s\" in publication column list",
     580             :                            colname));
     581             : 
     582         862 :         if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     583           6 :             ereport(ERROR,
     584             :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     585             :                     errmsg("cannot use virtual generated column \"%s\" in publication column list",
     586             :                            colname));
     587             : 
     588         856 :         if (bms_is_member(attnum, set))
     589          12 :             ereport(ERROR,
     590             :                     errcode(ERRCODE_DUPLICATE_OBJECT),
     591             :                     errmsg("duplicate column \"%s\" in publication column list",
     592             :                            colname));
     593             : 
     594         844 :         set = bms_add_member(set, attnum);
     595             :     }
     596             : 
     597        1464 :     return set;
     598             : }
     599             : 
     600             : /*
     601             :  * Transform a column list (represented by an array Datum) to a bitmapset.
     602             :  *
     603             :  * If columns isn't NULL, add the column numbers to that set.
     604             :  *
     605             :  * If mcxt isn't NULL, build the bitmapset in that context.
     606             :  */
     607             : Bitmapset *
     608         444 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
     609             : {
     610         444 :     Bitmapset  *result = columns;
     611             :     ArrayType  *arr;
     612             :     int         nelems;
     613             :     int16      *elems;
     614         444 :     MemoryContext oldcxt = NULL;
     615             : 
     616         444 :     arr = DatumGetArrayTypeP(pubcols);
     617         444 :     nelems = ARR_DIMS(arr)[0];
     618         444 :     elems = (int16 *) ARR_DATA_PTR(arr);
     619             : 
     620             :     /* If a memory context was specified, switch to it. */
     621         444 :     if (mcxt)
     622          78 :         oldcxt = MemoryContextSwitchTo(mcxt);
     623             : 
     624        1226 :     for (int i = 0; i < nelems; i++)
     625         782 :         result = bms_add_member(result, elems[i]);
     626             : 
     627         444 :     if (mcxt)
     628          78 :         MemoryContextSwitchTo(oldcxt);
     629             : 
     630         444 :     return result;
     631             : }
     632             : 
     633             : /*
     634             :  * Returns a bitmap representing the columns of the specified table.
     635             :  *
     636             :  * Generated columns are included if include_gencols_type is
     637             :  * PUBLISH_GENCOLS_STORED.
     638             :  */
     639             : Bitmapset *
     640          18 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
     641             : {
     642          18 :     Bitmapset  *result = NULL;
     643          18 :     TupleDesc   desc = RelationGetDescr(relation);
     644             : 
     645          60 :     for (int i = 0; i < desc->natts; i++)
     646             :     {
     647          42 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     648             : 
     649          42 :         if (att->attisdropped)
     650           2 :             continue;
     651             : 
     652          40 :         if (att->attgenerated)
     653             :         {
     654             :             /* We only support replication of STORED generated cols. */
     655           4 :             if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
     656           2 :                 continue;
     657             : 
     658             :             /* User hasn't requested to replicate STORED generated cols. */
     659           2 :             if (include_gencols_type != PUBLISH_GENCOLS_STORED)
     660           2 :                 continue;
     661             :         }
     662             : 
     663          36 :         result = bms_add_member(result, att->attnum);
     664             :     }
     665             : 
     666          18 :     return result;
     667             : }
     668             : 
     669             : /*
     670             :  * Insert new publication / schema mapping.
     671             :  */
     672             : ObjectAddress
     673         262 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
     674             : {
     675             :     Relation    rel;
     676             :     HeapTuple   tup;
     677             :     Datum       values[Natts_pg_publication_namespace];
     678             :     bool        nulls[Natts_pg_publication_namespace];
     679             :     Oid         psschid;
     680         262 :     Publication *pub = GetPublication(pubid);
     681         262 :     List       *schemaRels = NIL;
     682             :     ObjectAddress myself,
     683             :                 referenced;
     684             : 
     685         262 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
     686             : 
     687             :     /*
     688             :      * Check for duplicates. Note that this does not really prevent
     689             :      * duplicates, it's here just to provide nicer error message in common
     690             :      * case. The real protection is the unique key on the catalog.
     691             :      */
     692         262 :     if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     693             :                               ObjectIdGetDatum(schemaid),
     694             :                               ObjectIdGetDatum(pubid)))
     695             :     {
     696          18 :         table_close(rel, RowExclusiveLock);
     697             : 
     698          18 :         if (if_not_exists)
     699          12 :             return InvalidObjectAddress;
     700             : 
     701           6 :         ereport(ERROR,
     702             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     703             :                  errmsg("schema \"%s\" is already member of publication \"%s\"",
     704             :                         get_namespace_name(schemaid), pub->name)));
     705             :     }
     706             : 
     707         244 :     check_publication_add_schema(schemaid);
     708             : 
     709             :     /* Form a tuple */
     710         238 :     memset(values, 0, sizeof(values));
     711         238 :     memset(nulls, false, sizeof(nulls));
     712             : 
     713         238 :     psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
     714             :                                  Anum_pg_publication_namespace_oid);
     715         238 :     values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
     716         238 :     values[Anum_pg_publication_namespace_pnpubid - 1] =
     717         238 :         ObjectIdGetDatum(pubid);
     718         238 :     values[Anum_pg_publication_namespace_pnnspid - 1] =
     719         238 :         ObjectIdGetDatum(schemaid);
     720             : 
     721         238 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     722             : 
     723             :     /* Insert tuple into catalog */
     724         238 :     CatalogTupleInsert(rel, tup);
     725         238 :     heap_freetuple(tup);
     726             : 
     727         238 :     ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
     728             : 
     729             :     /* Add dependency on the publication */
     730         238 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     731         238 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     732             : 
     733             :     /* Add dependency on the schema */
     734         238 :     ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
     735         238 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     736             : 
     737             :     /* Close the table */
     738         238 :     table_close(rel, RowExclusiveLock);
     739             : 
     740             :     /*
     741             :      * Invalidate relcache so that publication info is rebuilt. See
     742             :      * publication_add_relation for why we need to consider all the
     743             :      * partitions.
     744             :      */
     745         238 :     schemaRels = GetSchemaPublicationRelations(schemaid,
     746             :                                                PUBLICATION_PART_ALL);
     747         238 :     InvalidatePublicationRels(schemaRels);
     748             : 
     749         238 :     return myself;
     750             : }
     751             : 
     752             : /* Gets list of publication oids for a relation */
     753             : List *
     754       13386 : GetRelationPublications(Oid relid)
     755             : {
     756       13386 :     List       *result = NIL;
     757             :     CatCList   *pubrellist;
     758             :     int         i;
     759             : 
     760             :     /* Find all publications associated with the relation. */
     761       13386 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     762             :                                      ObjectIdGetDatum(relid));
     763       15038 :     for (i = 0; i < pubrellist->n_members; i++)
     764             :     {
     765        1652 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     766        1652 :         Oid         pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
     767             : 
     768        1652 :         result = lappend_oid(result, pubid);
     769             :     }
     770             : 
     771       13386 :     ReleaseSysCacheList(pubrellist);
     772             : 
     773       13386 :     return result;
     774             : }
     775             : 
     776             : /*
     777             :  * Gets list of relation oids for a publication.
     778             :  *
     779             :  * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
     780             :  * should use GetAllPublicationRelations().
     781             :  */
     782             : List *
     783        2354 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     784             : {
     785             :     List       *result;
     786             :     Relation    pubrelsrel;
     787             :     ScanKeyData scankey;
     788             :     SysScanDesc scan;
     789             :     HeapTuple   tup;
     790             : 
     791             :     /* Find all publications associated with the relation. */
     792        2354 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     793             : 
     794        2354 :     ScanKeyInit(&scankey,
     795             :                 Anum_pg_publication_rel_prpubid,
     796             :                 BTEqualStrategyNumber, F_OIDEQ,
     797             :                 ObjectIdGetDatum(pubid));
     798             : 
     799        2354 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
     800             :                               true, NULL, 1, &scankey);
     801             : 
     802        2354 :     result = NIL;
     803        5448 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     804             :     {
     805             :         Form_pg_publication_rel pubrel;
     806             : 
     807        3094 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     808        3094 :         result = GetPubPartitionOptionRelations(result, pub_partopt,
     809             :                                                 pubrel->prrelid);
     810             :     }
     811             : 
     812        2354 :     systable_endscan(scan);
     813        2354 :     table_close(pubrelsrel, AccessShareLock);
     814             : 
     815             :     /* Now sort and de-duplicate the result list */
     816        2354 :     list_sort(result, list_oid_cmp);
     817        2354 :     list_deduplicate_oid(result);
     818             : 
     819        2354 :     return result;
     820             : }
     821             : 
     822             : /*
     823             :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     824             :  */
     825             : List *
     826        9064 : GetAllTablesPublications(void)
     827             : {
     828             :     List       *result;
     829             :     Relation    rel;
     830             :     ScanKeyData scankey;
     831             :     SysScanDesc scan;
     832             :     HeapTuple   tup;
     833             : 
     834             :     /* Find all publications that are marked as for all tables. */
     835        9064 :     rel = table_open(PublicationRelationId, AccessShareLock);
     836             : 
     837        9064 :     ScanKeyInit(&scankey,
     838             :                 Anum_pg_publication_puballtables,
     839             :                 BTEqualStrategyNumber, F_BOOLEQ,
     840             :                 BoolGetDatum(true));
     841             : 
     842        9064 :     scan = systable_beginscan(rel, InvalidOid, false,
     843             :                               NULL, 1, &scankey);
     844             : 
     845        9064 :     result = NIL;
     846        9280 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     847             :     {
     848         216 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
     849             : 
     850         216 :         result = lappend_oid(result, oid);
     851             :     }
     852             : 
     853        9064 :     systable_endscan(scan);
     854        9064 :     table_close(rel, AccessShareLock);
     855             : 
     856        9064 :     return result;
     857             : }
     858             : 
     859             : /*
     860             :  * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
     861             :  * publication(s).
     862             :  *
     863             :  * If the publication publishes partition changes via their respective root
     864             :  * partitioned tables, we must exclude partitions in favor of including the
     865             :  * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
     866             :  * publication.
     867             :  */
     868             : List *
     869         344 : GetAllPublicationRelations(char relkind, bool pubviaroot)
     870             : {
     871             :     Relation    classRel;
     872             :     ScanKeyData key[1];
     873             :     TableScanDesc scan;
     874             :     HeapTuple   tuple;
     875         344 :     List       *result = NIL;
     876             : 
     877             :     Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
     878             : 
     879         344 :     classRel = table_open(RelationRelationId, AccessShareLock);
     880             : 
     881         344 :     ScanKeyInit(&key[0],
     882             :                 Anum_pg_class_relkind,
     883             :                 BTEqualStrategyNumber, F_CHAREQ,
     884             :                 CharGetDatum(relkind));
     885             : 
     886         344 :     scan = table_beginscan_catalog(classRel, 1, key);
     887             : 
     888       25338 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     889             :     {
     890       24994 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     891       24994 :         Oid         relid = relForm->oid;
     892             : 
     893       24994 :         if (is_publishable_class(relid, relForm) &&
     894        1590 :             !(relForm->relispartition && pubviaroot))
     895        1408 :             result = lappend_oid(result, relid);
     896             :     }
     897             : 
     898         344 :     table_endscan(scan);
     899             : 
     900         344 :     if (pubviaroot)
     901             :     {
     902          26 :         ScanKeyInit(&key[0],
     903             :                     Anum_pg_class_relkind,
     904             :                     BTEqualStrategyNumber, F_CHAREQ,
     905             :                     CharGetDatum(RELKIND_PARTITIONED_TABLE));
     906             : 
     907          26 :         scan = table_beginscan_catalog(classRel, 1, key);
     908             : 
     909         156 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     910             :         {
     911         130 :             Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     912         130 :             Oid         relid = relForm->oid;
     913             : 
     914         130 :             if (is_publishable_class(relid, relForm) &&
     915         130 :                 !relForm->relispartition)
     916         104 :                 result = lappend_oid(result, relid);
     917             :         }
     918             : 
     919          26 :         table_endscan(scan);
     920             :     }
     921             : 
     922         344 :     table_close(classRel, AccessShareLock);
     923         344 :     return result;
     924             : }
     925             : 
     926             : /*
     927             :  * Gets the list of schema oids for a publication.
     928             :  *
     929             :  * This should only be used FOR TABLES IN SCHEMA publications.
     930             :  */
     931             : List *
     932        2262 : GetPublicationSchemas(Oid pubid)
     933             : {
     934        2262 :     List       *result = NIL;
     935             :     Relation    pubschsrel;
     936             :     ScanKeyData scankey;
     937             :     SysScanDesc scan;
     938             :     HeapTuple   tup;
     939             : 
     940             :     /* Find all schemas associated with the publication */
     941        2262 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     942             : 
     943        2262 :     ScanKeyInit(&scankey,
     944             :                 Anum_pg_publication_namespace_pnpubid,
     945             :                 BTEqualStrategyNumber, F_OIDEQ,
     946             :                 ObjectIdGetDatum(pubid));
     947             : 
     948        2262 :     scan = systable_beginscan(pubschsrel,
     949             :                               PublicationNamespacePnnspidPnpubidIndexId,
     950             :                               true, NULL, 1, &scankey);
     951        2362 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     952             :     {
     953             :         Form_pg_publication_namespace pubsch;
     954             : 
     955         100 :         pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
     956             : 
     957         100 :         result = lappend_oid(result, pubsch->pnnspid);
     958             :     }
     959             : 
     960        2262 :     systable_endscan(scan);
     961        2262 :     table_close(pubschsrel, AccessShareLock);
     962             : 
     963        2262 :     return result;
     964             : }
     965             : 
     966             : /*
     967             :  * Gets the list of publication oids associated with a specified schema.
     968             :  */
     969             : List *
     970       12928 : GetSchemaPublications(Oid schemaid)
     971             : {
     972       12928 :     List       *result = NIL;
     973             :     CatCList   *pubschlist;
     974             :     int         i;
     975             : 
     976             :     /* Find all publications associated with the schema */
     977       12928 :     pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
     978             :                                      ObjectIdGetDatum(schemaid));
     979       13042 :     for (i = 0; i < pubschlist->n_members; i++)
     980             :     {
     981         114 :         HeapTuple   tup = &pubschlist->members[i]->tuple;
     982         114 :         Oid         pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
     983             : 
     984         114 :         result = lappend_oid(result, pubid);
     985             :     }
     986             : 
     987       12928 :     ReleaseSysCacheList(pubschlist);
     988             : 
     989       12928 :     return result;
     990             : }
     991             : 
     992             : /*
     993             :  * Get the list of publishable relation oids for a specified schema.
     994             :  */
     995             : List *
     996         500 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
     997             : {
     998             :     Relation    classRel;
     999             :     ScanKeyData key[1];
    1000             :     TableScanDesc scan;
    1001             :     HeapTuple   tuple;
    1002         500 :     List       *result = NIL;
    1003             : 
    1004             :     Assert(OidIsValid(schemaid));
    1005             : 
    1006         500 :     classRel = table_open(RelationRelationId, AccessShareLock);
    1007             : 
    1008         500 :     ScanKeyInit(&key[0],
    1009             :                 Anum_pg_class_relnamespace,
    1010             :                 BTEqualStrategyNumber, F_OIDEQ,
    1011             :                 ObjectIdGetDatum(schemaid));
    1012             : 
    1013             :     /* get all the relations present in the specified schema */
    1014         500 :     scan = table_beginscan_catalog(classRel, 1, key);
    1015       25732 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    1016             :     {
    1017       25232 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
    1018       25232 :         Oid         relid = relForm->oid;
    1019             :         char        relkind;
    1020             : 
    1021       25232 :         if (!is_publishable_class(relid, relForm))
    1022        8656 :             continue;
    1023             : 
    1024       16576 :         relkind = get_rel_relkind(relid);
    1025       16576 :         if (relkind == RELKIND_RELATION)
    1026       14206 :             result = lappend_oid(result, relid);
    1027        2370 :         else if (relkind == RELKIND_PARTITIONED_TABLE)
    1028             :         {
    1029         782 :             List       *partitionrels = NIL;
    1030             : 
    1031             :             /*
    1032             :              * It is quite possible that some of the partitions are in a
    1033             :              * different schema than the parent table, so we need to get such
    1034             :              * partitions separately.
    1035             :              */
    1036         782 :             partitionrels = GetPubPartitionOptionRelations(partitionrels,
    1037             :                                                            pub_partopt,
    1038             :                                                            relForm->oid);
    1039         782 :             result = list_concat_unique_oid(result, partitionrels);
    1040             :         }
    1041             :     }
    1042             : 
    1043         500 :     table_endscan(scan);
    1044         500 :     table_close(classRel, AccessShareLock);
    1045         500 :     return result;
    1046             : }
    1047             : 
    1048             : /*
    1049             :  * Gets the list of all relations published by FOR TABLES IN SCHEMA
    1050             :  * publication.
    1051             :  */
    1052             : List *
    1053        1858 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
    1054             : {
    1055        1858 :     List       *result = NIL;
    1056        1858 :     List       *pubschemalist = GetPublicationSchemas(pubid);
    1057             :     ListCell   *cell;
    1058             : 
    1059        1928 :     foreach(cell, pubschemalist)
    1060             :     {
    1061          70 :         Oid         schemaid = lfirst_oid(cell);
    1062          70 :         List       *schemaRels = NIL;
    1063             : 
    1064          70 :         schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
    1065          70 :         result = list_concat(result, schemaRels);
    1066             :     }
    1067             : 
    1068        1858 :     return result;
    1069             : }
    1070             : 
    1071             : /*
    1072             :  * Get publication using oid
    1073             :  *
    1074             :  * The Publication struct and its data are palloc'ed here.
    1075             :  */
    1076             : Publication *
    1077        9082 : GetPublication(Oid pubid)
    1078             : {
    1079             :     HeapTuple   tup;
    1080             :     Publication *pub;
    1081             :     Form_pg_publication pubform;
    1082             : 
    1083        9082 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1084        9082 :     if (!HeapTupleIsValid(tup))
    1085           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1086             : 
    1087        9082 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1088             : 
    1089        9082 :     pub = (Publication *) palloc(sizeof(Publication));
    1090        9082 :     pub->oid = pubid;
    1091        9082 :     pub->name = pstrdup(NameStr(pubform->pubname));
    1092        9082 :     pub->alltables = pubform->puballtables;
    1093        9082 :     pub->allsequences = pubform->puballsequences;
    1094        9082 :     pub->pubactions.pubinsert = pubform->pubinsert;
    1095        9082 :     pub->pubactions.pubupdate = pubform->pubupdate;
    1096        9082 :     pub->pubactions.pubdelete = pubform->pubdelete;
    1097        9082 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
    1098        9082 :     pub->pubviaroot = pubform->pubviaroot;
    1099        9082 :     pub->pubgencols_type = pubform->pubgencols;
    1100             : 
    1101        9082 :     ReleaseSysCache(tup);
    1102             : 
    1103        9082 :     return pub;
    1104             : }
    1105             : 
    1106             : /*
    1107             :  * Get Publication using name.
    1108             :  */
    1109             : Publication *
    1110        2586 : GetPublicationByName(const char *pubname, bool missing_ok)
    1111             : {
    1112             :     Oid         oid;
    1113             : 
    1114        2586 :     oid = get_publication_oid(pubname, missing_ok);
    1115             : 
    1116        2586 :     return OidIsValid(oid) ? GetPublication(oid) : NULL;
    1117             : }
    1118             : 
    1119             : /*
    1120             :  * Get information of the tables in the given publication array.
    1121             :  *
    1122             :  * Returns pubid, relid, column list, row filter for each table.
    1123             :  */
    1124             : Datum
    1125        6132 : pg_get_publication_tables(PG_FUNCTION_ARGS)
    1126             : {
    1127             : #define NUM_PUBLICATION_TABLES_ELEM 4
    1128             :     FuncCallContext *funcctx;
    1129        6132 :     List       *table_infos = NIL;
    1130             : 
    1131             :     /* stuff done only on the first call of the function */
    1132        6132 :     if (SRF_IS_FIRSTCALL())
    1133             :     {
    1134             :         TupleDesc   tupdesc;
    1135             :         MemoryContext oldcontext;
    1136             :         ArrayType  *arr;
    1137             :         Datum      *elems;
    1138             :         int         nelems,
    1139             :                     i;
    1140        1986 :         bool        viaroot = false;
    1141             : 
    1142             :         /* create a function context for cross-call persistence */
    1143        1986 :         funcctx = SRF_FIRSTCALL_INIT();
    1144             : 
    1145             :         /* switch to memory context appropriate for multiple function calls */
    1146        1986 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1147             : 
    1148             :         /*
    1149             :          * Deconstruct the parameter into elements where each element is a
    1150             :          * publication name.
    1151             :          */
    1152        1986 :         arr = PG_GETARG_ARRAYTYPE_P(0);
    1153        1986 :         deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
    1154             : 
    1155             :         /* Get Oids of tables from each publication. */
    1156        4062 :         for (i = 0; i < nelems; i++)
    1157             :         {
    1158             :             Publication *pub_elem;
    1159        2076 :             List       *pub_elem_tables = NIL;
    1160             :             ListCell   *lc;
    1161             : 
    1162        2076 :             pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
    1163             : 
    1164             :             /*
    1165             :              * Publications support partitioned tables. If
    1166             :              * publish_via_partition_root is false, all changes are replicated
    1167             :              * using leaf partition identity and schema, so we only need
    1168             :              * those. Otherwise, get the partitioned table itself.
    1169             :              */
    1170        2076 :             if (pub_elem->alltables)
    1171         344 :                 pub_elem_tables = GetAllPublicationRelations(RELKIND_RELATION,
    1172         344 :                                                              pub_elem->pubviaroot);
    1173             :             else
    1174             :             {
    1175             :                 List       *relids,
    1176             :                            *schemarelids;
    1177             : 
    1178        1732 :                 relids = GetPublicationRelations(pub_elem->oid,
    1179        1732 :                                                  pub_elem->pubviaroot ?
    1180        1732 :                                                  PUBLICATION_PART_ROOT :
    1181             :                                                  PUBLICATION_PART_LEAF);
    1182        1732 :                 schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
    1183        1732 :                                                                 pub_elem->pubviaroot ?
    1184        1732 :                                                                 PUBLICATION_PART_ROOT :
    1185             :                                                                 PUBLICATION_PART_LEAF);
    1186        1732 :                 pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
    1187             :             }
    1188             : 
    1189             :             /*
    1190             :              * Record the published table and the corresponding publication so
    1191             :              * that we can get row filters and column lists later.
    1192             :              *
    1193             :              * When a table is published by multiple publications, to obtain
    1194             :              * all row filters and column lists, the structure related to this
    1195             :              * table will be recorded multiple times.
    1196             :              */
    1197        6302 :             foreach(lc, pub_elem_tables)
    1198             :             {
    1199        4226 :                 published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
    1200             : 
    1201        4226 :                 table_info->relid = lfirst_oid(lc);
    1202        4226 :                 table_info->pubid = pub_elem->oid;
    1203        4226 :                 table_infos = lappend(table_infos, table_info);
    1204             :             }
    1205             : 
    1206             :             /* At least one publication is using publish_via_partition_root. */
    1207        2076 :             if (pub_elem->pubviaroot)
    1208         368 :                 viaroot = true;
    1209             :         }
    1210             : 
    1211             :         /*
    1212             :          * If the publication publishes partition changes via their respective
    1213             :          * root partitioned tables, we must exclude partitions in favor of
    1214             :          * including the root partitioned tables. Otherwise, the function
    1215             :          * could return both the child and parent tables which could cause
    1216             :          * data of the child table to be double-published on the subscriber
    1217             :          * side.
    1218             :          */
    1219        1986 :         if (viaroot)
    1220         352 :             filter_partitions(table_infos);
    1221             : 
    1222             :         /* Construct a tuple descriptor for the result rows. */
    1223        1986 :         tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
    1224        1986 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
    1225             :                            OIDOID, -1, 0);
    1226        1986 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
    1227             :                            OIDOID, -1, 0);
    1228        1986 :         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
    1229             :                            INT2VECTOROID, -1, 0);
    1230        1986 :         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
    1231             :                            PG_NODE_TREEOID, -1, 0);
    1232             : 
    1233        1986 :         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
    1234        1986 :         funcctx->user_fctx = table_infos;
    1235             : 
    1236        1986 :         MemoryContextSwitchTo(oldcontext);
    1237             :     }
    1238             : 
    1239             :     /* stuff done on every call of the function */
    1240        6132 :     funcctx = SRF_PERCALL_SETUP();
    1241        6132 :     table_infos = (List *) funcctx->user_fctx;
    1242             : 
    1243        6132 :     if (funcctx->call_cntr < list_length(table_infos))
    1244             :     {
    1245        4146 :         HeapTuple   pubtuple = NULL;
    1246             :         HeapTuple   rettuple;
    1247             :         Publication *pub;
    1248        4146 :         published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
    1249        4146 :         Oid         relid = table_info->relid;
    1250        4146 :         Oid         schemaid = get_rel_namespace(relid);
    1251        4146 :         Datum       values[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1252        4146 :         bool        nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1253             : 
    1254             :         /*
    1255             :          * Form tuple with appropriate data.
    1256             :          */
    1257             : 
    1258        4146 :         pub = GetPublication(table_info->pubid);
    1259             : 
    1260        4146 :         values[0] = ObjectIdGetDatum(pub->oid);
    1261        4146 :         values[1] = ObjectIdGetDatum(relid);
    1262             : 
    1263             :         /*
    1264             :          * We don't consider row filters or column lists for FOR ALL TABLES or
    1265             :          * FOR TABLES IN SCHEMA publications.
    1266             :          */
    1267        4146 :         if (!pub->alltables &&
    1268        2634 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
    1269             :                                    ObjectIdGetDatum(schemaid),
    1270             :                                    ObjectIdGetDatum(pub->oid)))
    1271        2540 :             pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
    1272             :                                            ObjectIdGetDatum(relid),
    1273             :                                            ObjectIdGetDatum(pub->oid));
    1274             : 
    1275        4146 :         if (HeapTupleIsValid(pubtuple))
    1276             :         {
    1277             :             /* Lookup the column list attribute. */
    1278        2314 :             values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1279             :                                         Anum_pg_publication_rel_prattrs,
    1280             :                                         &(nulls[2]));
    1281             : 
    1282             :             /* Null indicates no filter. */
    1283        2314 :             values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1284             :                                         Anum_pg_publication_rel_prqual,
    1285             :                                         &(nulls[3]));
    1286             :         }
    1287             :         else
    1288             :         {
    1289        1832 :             nulls[2] = true;
    1290        1832 :             nulls[3] = true;
    1291             :         }
    1292             : 
    1293             :         /* Show all columns when the column list is not specified. */
    1294        4146 :         if (nulls[2])
    1295             :         {
    1296        3894 :             Relation    rel = table_open(relid, AccessShareLock);
    1297        3894 :             int         nattnums = 0;
    1298             :             int16      *attnums;
    1299        3894 :             TupleDesc   desc = RelationGetDescr(rel);
    1300             :             int         i;
    1301             : 
    1302        3894 :             attnums = (int16 *) palloc(desc->natts * sizeof(int16));
    1303             : 
    1304       10698 :             for (i = 0; i < desc->natts; i++)
    1305             :             {
    1306        6804 :                 Form_pg_attribute att = TupleDescAttr(desc, i);
    1307             : 
    1308        6804 :                 if (att->attisdropped)
    1309          12 :                     continue;
    1310             : 
    1311        6792 :                 if (att->attgenerated)
    1312             :                 {
    1313             :                     /* We only support replication of STORED generated cols. */
    1314          96 :                     if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
    1315          72 :                         continue;
    1316             : 
    1317             :                     /*
    1318             :                      * User hasn't requested to replicate STORED generated
    1319             :                      * cols.
    1320             :                      */
    1321          24 :                     if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
    1322          18 :                         continue;
    1323             :                 }
    1324             : 
    1325        6702 :                 attnums[nattnums++] = att->attnum;
    1326             :             }
    1327             : 
    1328        3894 :             if (nattnums > 0)
    1329             :             {
    1330        3850 :                 values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
    1331        3850 :                 nulls[2] = false;
    1332             :             }
    1333             : 
    1334        3894 :             table_close(rel, AccessShareLock);
    1335             :         }
    1336             : 
    1337        4146 :         rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
    1338             : 
    1339        4146 :         SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
    1340             :     }
    1341             : 
    1342        1986 :     SRF_RETURN_DONE(funcctx);
    1343             : }
    1344             : 
    1345             : /*
    1346             :  * Returns Oids of sequences in a publication.
    1347             :  */
    1348             : Datum
    1349           0 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
    1350             : {
    1351             :     FuncCallContext *funcctx;
    1352           0 :     List       *sequences = NIL;
    1353             : 
    1354             :     /* stuff done only on the first call of the function */
    1355           0 :     if (SRF_IS_FIRSTCALL())
    1356             :     {
    1357           0 :         char       *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1358             :         Publication *publication;
    1359             :         MemoryContext oldcontext;
    1360             : 
    1361             :         /* create a function context for cross-call persistence */
    1362           0 :         funcctx = SRF_FIRSTCALL_INIT();
    1363             : 
    1364             :         /* switch to memory context appropriate for multiple function calls */
    1365           0 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1366             : 
    1367           0 :         publication = GetPublicationByName(pubname, false);
    1368             : 
    1369           0 :         if (publication->allsequences)
    1370           0 :             sequences = GetAllPublicationRelations(RELKIND_SEQUENCE, false);
    1371             : 
    1372           0 :         funcctx->user_fctx = (void *) sequences;
    1373             : 
    1374           0 :         MemoryContextSwitchTo(oldcontext);
    1375             :     }
    1376             : 
    1377             :     /* stuff done on every call of the function */
    1378           0 :     funcctx = SRF_PERCALL_SETUP();
    1379           0 :     sequences = (List *) funcctx->user_fctx;
    1380             : 
    1381           0 :     if (funcctx->call_cntr < list_length(sequences))
    1382             :     {
    1383           0 :         Oid         relid = list_nth_oid(sequences, funcctx->call_cntr);
    1384             : 
    1385           0 :         SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
    1386             :     }
    1387             : 
    1388           0 :     SRF_RETURN_DONE(funcctx);
    1389             : }

Generated by: LCOV version 1.16