LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 410 415 98.8 %
Date: 2024-04-24 06:11:09 Functions: 26 26 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_publication.c
       4             :  *      publication C API manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_publication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "catalog/catalog.h"
      22             : #include "catalog/dependency.h"
      23             : #include "catalog/indexing.h"
      24             : #include "catalog/namespace.h"
      25             : #include "catalog/objectaddress.h"
      26             : #include "catalog/partition.h"
      27             : #include "catalog/pg_inherits.h"
      28             : #include "catalog/pg_namespace.h"
      29             : #include "catalog/pg_publication.h"
      30             : #include "catalog/pg_publication_namespace.h"
      31             : #include "catalog/pg_publication_rel.h"
      32             : #include "catalog/pg_type.h"
      33             : #include "commands/publicationcmds.h"
      34             : #include "funcapi.h"
      35             : #include "utils/array.h"
      36             : #include "utils/builtins.h"
      37             : #include "utils/catcache.h"
      38             : #include "utils/fmgroids.h"
      39             : #include "utils/lsyscache.h"
      40             : #include "utils/rel.h"
      41             : #include "utils/syscache.h"
      42             : 
      43             : /* Records association between publication and published table */
      44             : typedef struct
      45             : {
      46             :     Oid         relid;          /* OID of published table */
      47             :     Oid         pubid;          /* OID of publication that publishes this
      48             :                                  * table. */
      49             : } published_rel;
      50             : 
      51             : static void publication_translate_columns(Relation targetrel, List *columns,
      52             :                                           int *natts, AttrNumber **attrs);
      53             : 
      54             : /*
      55             :  * Check if relation can be in given publication and throws appropriate
      56             :  * error if not.
      57             :  */
      58             : static void
      59         986 : check_publication_add_relation(Relation targetrel)
      60             : {
      61             :     /* Must be a regular or partitioned table */
      62         986 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
      63         142 :         RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
      64          14 :         ereport(ERROR,
      65             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      66             :                  errmsg("cannot add relation \"%s\" to publication",
      67             :                         RelationGetRelationName(targetrel)),
      68             :                  errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
      69             : 
      70             :     /* Can't be system table */
      71         972 :     if (IsCatalogRelation(targetrel))
      72           6 :         ereport(ERROR,
      73             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      74             :                  errmsg("cannot add relation \"%s\" to publication",
      75             :                         RelationGetRelationName(targetrel)),
      76             :                  errdetail("This operation is not supported for system tables.")));
      77             : 
      78             :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      79         966 :     if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
      80           6 :         ereport(ERROR,
      81             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      82             :                  errmsg("cannot add relation \"%s\" to publication",
      83             :                         RelationGetRelationName(targetrel)),
      84             :                  errdetail("This operation is not supported for temporary tables.")));
      85         960 :     else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
      86           6 :         ereport(ERROR,
      87             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      88             :                  errmsg("cannot add relation \"%s\" to publication",
      89             :                         RelationGetRelationName(targetrel)),
      90             :                  errdetail("This operation is not supported for unlogged tables.")));
      91         954 : }
      92             : 
      93             : /*
      94             :  * Check if schema can be in given publication and throw appropriate error if
      95             :  * not.
      96             :  */
      97             : static void
      98         204 : check_publication_add_schema(Oid schemaid)
      99             : {
     100             :     /* Can't be system namespace */
     101         204 :     if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
     102           6 :         ereport(ERROR,
     103             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     104             :                  errmsg("cannot add schema \"%s\" to publication",
     105             :                         get_namespace_name(schemaid)),
     106             :                  errdetail("This operation is not supported for system schemas.")));
     107             : 
     108             :     /* Can't be temporary namespace */
     109         198 :     if (isAnyTempNamespace(schemaid))
     110           0 :         ereport(ERROR,
     111             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     112             :                  errmsg("cannot add schema \"%s\" to publication",
     113             :                         get_namespace_name(schemaid)),
     114             :                  errdetail("Temporary schemas cannot be replicated.")));
     115         198 : }
     116             : 
     117             : /*
     118             :  * Returns if relation represented by oid and Form_pg_class entry
     119             :  * is publishable.
     120             :  *
     121             :  * Does same checks as check_publication_add_relation() above, but does not
     122             :  * need relation to be opened and also does not throw errors.
     123             :  *
     124             :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
     125             :  * ie all tables created during initdb.  This mainly affects the preinstalled
     126             :  * information_schema.  IsCatalogRelationOid() only excludes tables with
     127             :  * relid < FirstUnpinnedObjectId, making that test rather redundant,
     128             :  * but really we should get rid of the FirstNormalObjectId test not
     129             :  * IsCatalogRelationOid.  We can't do so today because we don't want
     130             :  * information_schema tables to be considered publishable; but this test
     131             :  * is really inadequate for that, since the information_schema could be
     132             :  * dropped and reloaded and then it'll be considered publishable.  The best
     133             :  * long-term solution may be to add a "relispublishable" bool to pg_class,
     134             :  * and depend on that instead of OID checks.
     135             :  */
     136             : static bool
     137      591384 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     138             : {
     139      604122 :     return (reltuple->relkind == RELKIND_RELATION ||
     140       12738 :             reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
     141      579912 :         !IsCatalogRelationOid(relid) &&
     142     1182768 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     143             :         relid >= FirstNormalObjectId;
     144             : }
     145             : 
     146             : /*
     147             :  * Another variant of is_publishable_class(), taking a Relation.
     148             :  */
     149             : bool
     150      538252 : is_publishable_relation(Relation rel)
     151             : {
     152      538252 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     153             : }
     154             : 
     155             : /*
     156             :  * SQL-callable variant of the above
     157             :  *
     158             :  * This returns null when the relation does not exist.  This is intended to be
     159             :  * used for example in psql to avoid gratuitous errors when there are
     160             :  * concurrent catalog changes.
     161             :  */
     162             : Datum
     163        5316 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     164             : {
     165        5316 :     Oid         relid = PG_GETARG_OID(0);
     166             :     HeapTuple   tuple;
     167             :     bool        result;
     168             : 
     169        5316 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     170        5316 :     if (!HeapTupleIsValid(tuple))
     171           0 :         PG_RETURN_NULL();
     172        5316 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     173        5316 :     ReleaseSysCache(tuple);
     174        5316 :     PG_RETURN_BOOL(result);
     175             : }
     176             : 
     177             : /*
     178             :  * Returns true if the ancestor is in the list of published relations.
     179             :  * Otherwise, returns false.
     180             :  */
     181             : static bool
     182         198 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
     183             : {
     184             :     ListCell   *lc;
     185             : 
     186         678 :     foreach(lc, table_infos)
     187             :     {
     188         560 :         Oid         relid = ((published_rel *) lfirst(lc))->relid;
     189             : 
     190         560 :         if (relid == ancestor)
     191          80 :             return true;
     192             :     }
     193             : 
     194         118 :     return false;
     195             : }
     196             : 
     197             : /*
     198             :  * Filter out the partitions whose parent tables are also present in the list.
     199             :  */
     200             : static void
     201         336 : filter_partitions(List *table_infos)
     202             : {
     203             :     ListCell   *lc;
     204             : 
     205         998 :     foreach(lc, table_infos)
     206             :     {
     207         662 :         bool        skip = false;
     208         662 :         List       *ancestors = NIL;
     209             :         ListCell   *lc2;
     210         662 :         published_rel *table_info = (published_rel *) lfirst(lc);
     211             : 
     212         662 :         if (get_rel_relispartition(table_info->relid))
     213         198 :             ancestors = get_partition_ancestors(table_info->relid);
     214             : 
     215         780 :         foreach(lc2, ancestors)
     216             :         {
     217         198 :             Oid         ancestor = lfirst_oid(lc2);
     218             : 
     219         198 :             if (is_ancestor_member_tableinfos(ancestor, table_infos))
     220             :             {
     221          80 :                 skip = true;
     222          80 :                 break;
     223             :             }
     224             :         }
     225             : 
     226         662 :         if (skip)
     227          80 :             table_infos = foreach_delete_current(table_infos, lc);
     228             :     }
     229         336 : }
     230             : 
     231             : /*
     232             :  * Returns true if any schema is associated with the publication, false if no
     233             :  * schema is associated with the publication.
     234             :  */
     235             : bool
     236         256 : is_schema_publication(Oid pubid)
     237             : {
     238             :     Relation    pubschsrel;
     239             :     ScanKeyData scankey;
     240             :     SysScanDesc scan;
     241             :     HeapTuple   tup;
     242         256 :     bool        result = false;
     243             : 
     244         256 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     245         256 :     ScanKeyInit(&scankey,
     246             :                 Anum_pg_publication_namespace_pnpubid,
     247             :                 BTEqualStrategyNumber, F_OIDEQ,
     248             :                 ObjectIdGetDatum(pubid));
     249             : 
     250         256 :     scan = systable_beginscan(pubschsrel,
     251             :                               PublicationNamespacePnnspidPnpubidIndexId,
     252             :                               true, NULL, 1, &scankey);
     253         256 :     tup = systable_getnext(scan);
     254         256 :     result = HeapTupleIsValid(tup);
     255             : 
     256         256 :     systable_endscan(scan);
     257         256 :     table_close(pubschsrel, AccessShareLock);
     258             : 
     259         256 :     return result;
     260             : }
     261             : 
     262             : /*
     263             :  * Gets the relations based on the publication partition option for a specified
     264             :  * relation.
     265             :  */
     266             : List *
     267        5204 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
     268             :                                Oid relid)
     269             : {
     270        5204 :     if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
     271             :         pub_partopt != PUBLICATION_PART_ROOT)
     272        1032 :     {
     273        1032 :         List       *all_parts = find_all_inheritors(relid, NoLock,
     274             :                                                     NULL);
     275             : 
     276        1032 :         if (pub_partopt == PUBLICATION_PART_ALL)
     277         838 :             result = list_concat(result, all_parts);
     278         194 :         else if (pub_partopt == PUBLICATION_PART_LEAF)
     279             :         {
     280             :             ListCell   *lc;
     281             : 
     282         710 :             foreach(lc, all_parts)
     283             :             {
     284         516 :                 Oid         partOid = lfirst_oid(lc);
     285             : 
     286         516 :                 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
     287         322 :                     result = lappend_oid(result, partOid);
     288             :             }
     289             :         }
     290             :         else
     291             :             Assert(false);
     292             :     }
     293             :     else
     294        4172 :         result = lappend_oid(result, relid);
     295             : 
     296        5204 :     return result;
     297             : }
     298             : 
     299             : /*
     300             :  * Returns the relid of the topmost ancestor that is published via this
     301             :  * publication if any and set its ancestor level to ancestor_level,
     302             :  * otherwise returns InvalidOid.
     303             :  *
     304             :  * The ancestor_level value allows us to compare the results for multiple
     305             :  * publications, and decide which value is higher up.
     306             :  *
     307             :  * Note that the list of ancestors should be ordered such that the topmost
     308             :  * ancestor is at the end of the list.
     309             :  */
     310             : Oid
     311         432 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
     312             : {
     313             :     ListCell   *lc;
     314         432 :     Oid         topmost_relid = InvalidOid;
     315         432 :     int         level = 0;
     316             : 
     317             :     /*
     318             :      * Find the "topmost" ancestor that is in this publication.
     319             :      */
     320         878 :     foreach(lc, ancestors)
     321             :     {
     322         446 :         Oid         ancestor = lfirst_oid(lc);
     323         446 :         List       *apubids = GetRelationPublications(ancestor);
     324         446 :         List       *aschemaPubids = NIL;
     325             : 
     326         446 :         level++;
     327             : 
     328         446 :         if (list_member_oid(apubids, puboid))
     329             :         {
     330         292 :             topmost_relid = ancestor;
     331             : 
     332         292 :             if (ancestor_level)
     333          70 :                 *ancestor_level = level;
     334             :         }
     335             :         else
     336             :         {
     337         154 :             aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
     338         154 :             if (list_member_oid(aschemaPubids, puboid))
     339             :             {
     340          10 :                 topmost_relid = ancestor;
     341             : 
     342          10 :                 if (ancestor_level)
     343          10 :                     *ancestor_level = level;
     344             :             }
     345             :         }
     346             : 
     347         446 :         list_free(apubids);
     348         446 :         list_free(aschemaPubids);
     349             :     }
     350             : 
     351         432 :     return topmost_relid;
     352             : }
     353             : 
     354             : /*
     355             :  * Insert new publication / relation mapping.
     356             :  */
     357             : ObjectAddress
     358        1008 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
     359             :                          bool if_not_exists)
     360             : {
     361             :     Relation    rel;
     362             :     HeapTuple   tup;
     363             :     Datum       values[Natts_pg_publication_rel];
     364             :     bool        nulls[Natts_pg_publication_rel];
     365        1008 :     Relation    targetrel = pri->relation;
     366        1008 :     Oid         relid = RelationGetRelid(targetrel);
     367             :     Oid         pubreloid;
     368        1008 :     Publication *pub = GetPublication(pubid);
     369        1008 :     AttrNumber *attarray = NULL;
     370        1008 :     int         natts = 0;
     371             :     ObjectAddress myself,
     372             :                 referenced;
     373        1008 :     List       *relids = NIL;
     374             : 
     375        1008 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     376             : 
     377             :     /*
     378             :      * Check for duplicates. Note that this does not really prevent
     379             :      * duplicates, it's here just to provide nicer error message in common
     380             :      * case. The real protection is the unique key on the catalog.
     381             :      */
     382        1008 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     383             :                               ObjectIdGetDatum(pubid)))
     384             :     {
     385          22 :         table_close(rel, RowExclusiveLock);
     386             : 
     387          22 :         if (if_not_exists)
     388          16 :             return InvalidObjectAddress;
     389             : 
     390           6 :         ereport(ERROR,
     391             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     392             :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     393             :                         RelationGetRelationName(targetrel), pub->name)));
     394             :     }
     395             : 
     396         986 :     check_publication_add_relation(targetrel);
     397             : 
     398             :     /*
     399             :      * Translate column names to attnums and make sure the column list
     400             :      * contains only allowed elements (no system or generated columns etc.).
     401             :      * Also build an array of attnums, for storing in the catalog.
     402             :      */
     403         954 :     publication_translate_columns(pri->relation, pri->columns,
     404             :                                   &natts, &attarray);
     405             : 
     406             :     /* Form a tuple. */
     407         936 :     memset(values, 0, sizeof(values));
     408         936 :     memset(nulls, false, sizeof(nulls));
     409             : 
     410         936 :     pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     411             :                                    Anum_pg_publication_rel_oid);
     412         936 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
     413         936 :     values[Anum_pg_publication_rel_prpubid - 1] =
     414         936 :         ObjectIdGetDatum(pubid);
     415         936 :     values[Anum_pg_publication_rel_prrelid - 1] =
     416         936 :         ObjectIdGetDatum(relid);
     417             : 
     418             :     /* Add qualifications, if available */
     419         936 :     if (pri->whereClause != NULL)
     420         298 :         values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
     421             :     else
     422         638 :         nulls[Anum_pg_publication_rel_prqual - 1] = true;
     423             : 
     424             :     /* Add column list, if available */
     425         936 :     if (pri->columns)
     426         272 :         values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts));
     427             :     else
     428         664 :         nulls[Anum_pg_publication_rel_prattrs - 1] = true;
     429             : 
     430         936 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     431             : 
     432             :     /* Insert tuple into catalog. */
     433         936 :     CatalogTupleInsert(rel, tup);
     434         936 :     heap_freetuple(tup);
     435             : 
     436             :     /* Register dependencies as needed */
     437         936 :     ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
     438             : 
     439             :     /* Add dependency on the publication */
     440         936 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     441         936 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     442             : 
     443             :     /* Add dependency on the relation */
     444         936 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     445         936 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     446             : 
     447             :     /* Add dependency on the objects mentioned in the qualifications */
     448         936 :     if (pri->whereClause)
     449         298 :         recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
     450             :                                         DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
     451             :                                         false);
     452             : 
     453             :     /* Add dependency on the columns, if any are listed */
     454        1412 :     for (int i = 0; i < natts; i++)
     455             :     {
     456         476 :         ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
     457         476 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     458             :     }
     459             : 
     460             :     /* Close the table. */
     461         936 :     table_close(rel, RowExclusiveLock);
     462             : 
     463             :     /*
     464             :      * Invalidate relcache so that publication info is rebuilt.
     465             :      *
     466             :      * For the partitioned tables, we must invalidate all partitions contained
     467             :      * in the respective partition hierarchies, not just the one explicitly
     468             :      * mentioned in the publication. This is required because we implicitly
     469             :      * publish the child tables when the parent table is published.
     470             :      */
     471         936 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
     472             :                                             relid);
     473             : 
     474         936 :     InvalidatePublicationRels(relids);
     475             : 
     476         936 :     return myself;
     477             : }
     478             : 
     479             : /* qsort comparator for attnums */
     480             : static int
     481         204 : compare_int16(const void *a, const void *b)
     482             : {
     483         204 :     int         av = *(const int16 *) a;
     484         204 :     int         bv = *(const int16 *) b;
     485             : 
     486             :     /* this can't overflow if int is wider than int16 */
     487         204 :     return (av - bv);
     488             : }
     489             : 
     490             : /*
     491             :  * Translate a list of column names to an array of attribute numbers
     492             :  * and a Bitmapset with them; verify that each attribute is appropriate
     493             :  * to have in a publication column list (no system or generated attributes,
     494             :  * no duplicates).  Additional checks with replica identity are done later;
     495             :  * see pub_collist_contains_invalid_column.
     496             :  *
     497             :  * Note that the attribute numbers are *not* offset by
     498             :  * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
     499             :  * is okay.
     500             :  */
     501             : static void
     502         954 : publication_translate_columns(Relation targetrel, List *columns,
     503             :                               int *natts, AttrNumber **attrs)
     504             : {
     505         954 :     AttrNumber *attarray = NULL;
     506         954 :     Bitmapset  *set = NULL;
     507             :     ListCell   *lc;
     508         954 :     int         n = 0;
     509         954 :     TupleDesc   tupdesc = RelationGetDescr(targetrel);
     510             : 
     511             :     /* Bail out when no column list defined. */
     512         954 :     if (!columns)
     513         664 :         return;
     514             : 
     515             :     /*
     516             :      * Translate list of columns to attnums. We prohibit system attributes and
     517             :      * make sure there are no duplicate columns.
     518             :      */
     519         290 :     attarray = palloc(sizeof(AttrNumber) * list_length(columns));
     520         784 :     foreach(lc, columns)
     521             :     {
     522         512 :         char       *colname = strVal(lfirst(lc));
     523         512 :         AttrNumber  attnum = get_attnum(RelationGetRelid(targetrel), colname);
     524             : 
     525         512 :         if (attnum == InvalidAttrNumber)
     526           6 :             ereport(ERROR,
     527             :                     errcode(ERRCODE_UNDEFINED_COLUMN),
     528             :                     errmsg("column \"%s\" of relation \"%s\" does not exist",
     529             :                            colname, RelationGetRelationName(targetrel)));
     530             : 
     531         506 :         if (!AttrNumberIsForUserDefinedAttr(attnum))
     532           6 :             ereport(ERROR,
     533             :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     534             :                     errmsg("cannot use system column \"%s\" in publication column list",
     535             :                            colname));
     536             : 
     537         500 :         if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
     538           6 :             ereport(ERROR,
     539             :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     540             :                     errmsg("cannot use generated column \"%s\" in publication column list",
     541             :                            colname));
     542             : 
     543         494 :         if (bms_is_member(attnum, set))
     544           0 :             ereport(ERROR,
     545             :                     errcode(ERRCODE_DUPLICATE_OBJECT),
     546             :                     errmsg("duplicate column \"%s\" in publication column list",
     547             :                            colname));
     548             : 
     549         494 :         set = bms_add_member(set, attnum);
     550         494 :         attarray[n++] = attnum;
     551             :     }
     552             : 
     553             :     /* Be tidy, so that the catalog representation is always sorted */
     554         272 :     qsort(attarray, n, sizeof(AttrNumber), compare_int16);
     555             : 
     556         272 :     *natts = n;
     557         272 :     *attrs = attarray;
     558             : 
     559         272 :     bms_free(set);
     560             : }
     561             : 
     562             : /*
     563             :  * Transform a column list (represented by an array Datum) to a bitmapset.
     564             :  *
     565             :  * If columns isn't NULL, add the column numbers to that set.
     566             :  *
     567             :  * If mcxt isn't NULL, build the bitmapset in that context.
     568             :  */
     569             : Bitmapset *
     570         428 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
     571             : {
     572         428 :     Bitmapset  *result = NULL;
     573             :     ArrayType  *arr;
     574             :     int         nelems;
     575             :     int16      *elems;
     576         428 :     MemoryContext oldcxt = NULL;
     577             : 
     578             :     /*
     579             :      * If an existing bitmap was provided, use it. Otherwise just use NULL and
     580             :      * build a new bitmap.
     581             :      */
     582         428 :     if (columns)
     583           0 :         result = columns;
     584             : 
     585         428 :     arr = DatumGetArrayTypeP(pubcols);
     586         428 :     nelems = ARR_DIMS(arr)[0];
     587         428 :     elems = (int16 *) ARR_DATA_PTR(arr);
     588             : 
     589             :     /* If a memory context was specified, switch to it. */
     590         428 :     if (mcxt)
     591          74 :         oldcxt = MemoryContextSwitchTo(mcxt);
     592             : 
     593        1188 :     for (int i = 0; i < nelems; i++)
     594         760 :         result = bms_add_member(result, elems[i]);
     595             : 
     596         428 :     if (mcxt)
     597          74 :         MemoryContextSwitchTo(oldcxt);
     598             : 
     599         428 :     return result;
     600             : }
     601             : 
     602             : /*
     603             :  * Insert new publication / schema mapping.
     604             :  */
     605             : ObjectAddress
     606         222 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
     607             : {
     608             :     Relation    rel;
     609             :     HeapTuple   tup;
     610             :     Datum       values[Natts_pg_publication_namespace];
     611             :     bool        nulls[Natts_pg_publication_namespace];
     612             :     Oid         psschid;
     613         222 :     Publication *pub = GetPublication(pubid);
     614         222 :     List       *schemaRels = NIL;
     615             :     ObjectAddress myself,
     616             :                 referenced;
     617             : 
     618         222 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
     619             : 
     620             :     /*
     621             :      * Check for duplicates. Note that this does not really prevent
     622             :      * duplicates, it's here just to provide nicer error message in common
     623             :      * case. The real protection is the unique key on the catalog.
     624             :      */
     625         222 :     if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     626             :                               ObjectIdGetDatum(schemaid),
     627             :                               ObjectIdGetDatum(pubid)))
     628             :     {
     629          18 :         table_close(rel, RowExclusiveLock);
     630             : 
     631          18 :         if (if_not_exists)
     632          12 :             return InvalidObjectAddress;
     633             : 
     634           6 :         ereport(ERROR,
     635             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     636             :                  errmsg("schema \"%s\" is already member of publication \"%s\"",
     637             :                         get_namespace_name(schemaid), pub->name)));
     638             :     }
     639             : 
     640         204 :     check_publication_add_schema(schemaid);
     641             : 
     642             :     /* Form a tuple */
     643         198 :     memset(values, 0, sizeof(values));
     644         198 :     memset(nulls, false, sizeof(nulls));
     645             : 
     646         198 :     psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
     647             :                                  Anum_pg_publication_namespace_oid);
     648         198 :     values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
     649         198 :     values[Anum_pg_publication_namespace_pnpubid - 1] =
     650         198 :         ObjectIdGetDatum(pubid);
     651         198 :     values[Anum_pg_publication_namespace_pnnspid - 1] =
     652         198 :         ObjectIdGetDatum(schemaid);
     653             : 
     654         198 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     655             : 
     656             :     /* Insert tuple into catalog */
     657         198 :     CatalogTupleInsert(rel, tup);
     658         198 :     heap_freetuple(tup);
     659             : 
     660         198 :     ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
     661             : 
     662             :     /* Add dependency on the publication */
     663         198 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     664         198 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     665             : 
     666             :     /* Add dependency on the schema */
     667         198 :     ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
     668         198 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     669             : 
     670             :     /* Close the table */
     671         198 :     table_close(rel, RowExclusiveLock);
     672             : 
     673             :     /*
     674             :      * Invalidate relcache so that publication info is rebuilt. See
     675             :      * publication_add_relation for why we need to consider all the
     676             :      * partitions.
     677             :      */
     678         198 :     schemaRels = GetSchemaPublicationRelations(schemaid,
     679             :                                                PUBLICATION_PART_ALL);
     680         198 :     InvalidatePublicationRels(schemaRels);
     681             : 
     682         198 :     return myself;
     683             : }
     684             : 
     685             : /* Gets list of publication oids for a relation */
     686             : List *
     687       11098 : GetRelationPublications(Oid relid)
     688             : {
     689       11098 :     List       *result = NIL;
     690             :     CatCList   *pubrellist;
     691             :     int         i;
     692             : 
     693             :     /* Find all publications associated with the relation. */
     694       11098 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     695             :                                      ObjectIdGetDatum(relid));
     696       12522 :     for (i = 0; i < pubrellist->n_members; i++)
     697             :     {
     698        1424 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     699        1424 :         Oid         pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
     700             : 
     701        1424 :         result = lappend_oid(result, pubid);
     702             :     }
     703             : 
     704       11098 :     ReleaseSysCacheList(pubrellist);
     705             : 
     706       11098 :     return result;
     707             : }
     708             : 
     709             : /*
     710             :  * Gets list of relation oids for a publication.
     711             :  *
     712             :  * This should only be used FOR TABLE publications, the FOR ALL TABLES
     713             :  * should use GetAllTablesPublicationRelations().
     714             :  */
     715             : List *
     716        2074 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     717             : {
     718             :     List       *result;
     719             :     Relation    pubrelsrel;
     720             :     ScanKeyData scankey;
     721             :     SysScanDesc scan;
     722             :     HeapTuple   tup;
     723             : 
     724             :     /* Find all publications associated with the relation. */
     725        2074 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     726             : 
     727        2074 :     ScanKeyInit(&scankey,
     728             :                 Anum_pg_publication_rel_prpubid,
     729             :                 BTEqualStrategyNumber, F_OIDEQ,
     730             :                 ObjectIdGetDatum(pubid));
     731             : 
     732        2074 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
     733             :                               true, NULL, 1, &scankey);
     734             : 
     735        2074 :     result = NIL;
     736        4908 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     737             :     {
     738             :         Form_pg_publication_rel pubrel;
     739             : 
     740        2834 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     741        2834 :         result = GetPubPartitionOptionRelations(result, pub_partopt,
     742             :                                                 pubrel->prrelid);
     743             :     }
     744             : 
     745        2074 :     systable_endscan(scan);
     746        2074 :     table_close(pubrelsrel, AccessShareLock);
     747             : 
     748             :     /* Now sort and de-duplicate the result list */
     749        2074 :     list_sort(result, list_oid_cmp);
     750        2074 :     list_deduplicate_oid(result);
     751             : 
     752        2074 :     return result;
     753             : }
     754             : 
     755             : /*
     756             :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     757             :  */
     758             : List *
     759        7698 : GetAllTablesPublications(void)
     760             : {
     761             :     List       *result;
     762             :     Relation    rel;
     763             :     ScanKeyData scankey;
     764             :     SysScanDesc scan;
     765             :     HeapTuple   tup;
     766             : 
     767             :     /* Find all publications that are marked as for all tables. */
     768        7698 :     rel = table_open(PublicationRelationId, AccessShareLock);
     769             : 
     770        7698 :     ScanKeyInit(&scankey,
     771             :                 Anum_pg_publication_puballtables,
     772             :                 BTEqualStrategyNumber, F_BOOLEQ,
     773             :                 BoolGetDatum(true));
     774             : 
     775        7698 :     scan = systable_beginscan(rel, InvalidOid, false,
     776             :                               NULL, 1, &scankey);
     777             : 
     778        7698 :     result = NIL;
     779        7844 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     780             :     {
     781         146 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
     782             : 
     783         146 :         result = lappend_oid(result, oid);
     784             :     }
     785             : 
     786        7698 :     systable_endscan(scan);
     787        7698 :     table_close(rel, AccessShareLock);
     788             : 
     789        7698 :     return result;
     790             : }
     791             : 
     792             : /*
     793             :  * Gets list of all relation published by FOR ALL TABLES publication(s).
     794             :  *
     795             :  * If the publication publishes partition changes via their respective root
     796             :  * partitioned tables, we must exclude partitions in favor of including the
     797             :  * root partitioned tables.
     798             :  */
     799             : List *
     800         284 : GetAllTablesPublicationRelations(bool pubviaroot)
     801             : {
     802             :     Relation    classRel;
     803             :     ScanKeyData key[1];
     804             :     TableScanDesc scan;
     805             :     HeapTuple   tuple;
     806         284 :     List       *result = NIL;
     807             : 
     808         284 :     classRel = table_open(RelationRelationId, AccessShareLock);
     809             : 
     810         284 :     ScanKeyInit(&key[0],
     811             :                 Anum_pg_class_relkind,
     812             :                 BTEqualStrategyNumber, F_CHAREQ,
     813             :                 CharGetDatum(RELKIND_RELATION));
     814             : 
     815         284 :     scan = table_beginscan_catalog(classRel, 1, key);
     816             : 
     817       21026 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     818             :     {
     819       20742 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     820       20742 :         Oid         relid = relForm->oid;
     821             : 
     822       20742 :         if (is_publishable_class(relid, relForm) &&
     823        1418 :             !(relForm->relispartition && pubviaroot))
     824        1236 :             result = lappend_oid(result, relid);
     825             :     }
     826             : 
     827         284 :     table_endscan(scan);
     828             : 
     829         284 :     if (pubviaroot)
     830             :     {
     831          26 :         ScanKeyInit(&key[0],
     832             :                     Anum_pg_class_relkind,
     833             :                     BTEqualStrategyNumber, F_CHAREQ,
     834             :                     CharGetDatum(RELKIND_PARTITIONED_TABLE));
     835             : 
     836          26 :         scan = table_beginscan_catalog(classRel, 1, key);
     837             : 
     838         156 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     839             :         {
     840         130 :             Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     841         130 :             Oid         relid = relForm->oid;
     842             : 
     843         130 :             if (is_publishable_class(relid, relForm) &&
     844         130 :                 !relForm->relispartition)
     845         104 :                 result = lappend_oid(result, relid);
     846             :         }
     847             : 
     848          26 :         table_endscan(scan);
     849             :     }
     850             : 
     851         284 :     table_close(classRel, AccessShareLock);
     852         284 :     return result;
     853             : }
     854             : 
     855             : /*
     856             :  * Gets the list of schema oids for a publication.
     857             :  *
     858             :  * This should only be used FOR TABLES IN SCHEMA publications.
     859             :  */
     860             : List *
     861        2004 : GetPublicationSchemas(Oid pubid)
     862             : {
     863        2004 :     List       *result = NIL;
     864             :     Relation    pubschsrel;
     865             :     ScanKeyData scankey;
     866             :     SysScanDesc scan;
     867             :     HeapTuple   tup;
     868             : 
     869             :     /* Find all schemas associated with the publication */
     870        2004 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     871             : 
     872        2004 :     ScanKeyInit(&scankey,
     873             :                 Anum_pg_publication_namespace_pnpubid,
     874             :                 BTEqualStrategyNumber, F_OIDEQ,
     875             :                 ObjectIdGetDatum(pubid));
     876             : 
     877        2004 :     scan = systable_beginscan(pubschsrel,
     878             :                               PublicationNamespacePnnspidPnpubidIndexId,
     879             :                               true, NULL, 1, &scankey);
     880        2104 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     881             :     {
     882             :         Form_pg_publication_namespace pubsch;
     883             : 
     884         100 :         pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
     885             : 
     886         100 :         result = lappend_oid(result, pubsch->pnnspid);
     887             :     }
     888             : 
     889        2004 :     systable_endscan(scan);
     890        2004 :     table_close(pubschsrel, AccessShareLock);
     891             : 
     892        2004 :     return result;
     893             : }
     894             : 
     895             : /*
     896             :  * Gets the list of publication oids associated with a specified schema.
     897             :  */
     898             : List *
     899       10762 : GetSchemaPublications(Oid schemaid)
     900             : {
     901       10762 :     List       *result = NIL;
     902             :     CatCList   *pubschlist;
     903             :     int         i;
     904             : 
     905             :     /* Find all publications associated with the schema */
     906       10762 :     pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
     907             :                                      ObjectIdGetDatum(schemaid));
     908       10876 :     for (i = 0; i < pubschlist->n_members; i++)
     909             :     {
     910         114 :         HeapTuple   tup = &pubschlist->members[i]->tuple;
     911         114 :         Oid         pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
     912             : 
     913         114 :         result = lappend_oid(result, pubid);
     914             :     }
     915             : 
     916       10762 :     ReleaseSysCacheList(pubschlist);
     917             : 
     918       10762 :     return result;
     919             : }
     920             : 
     921             : /*
     922             :  * Get the list of publishable relation oids for a specified schema.
     923             :  */
     924             : List *
     925         460 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
     926             : {
     927             :     Relation    classRel;
     928             :     ScanKeyData key[1];
     929             :     TableScanDesc scan;
     930             :     HeapTuple   tuple;
     931         460 :     List       *result = NIL;
     932             : 
     933             :     Assert(OidIsValid(schemaid));
     934             : 
     935         460 :     classRel = table_open(RelationRelationId, AccessShareLock);
     936             : 
     937         460 :     ScanKeyInit(&key[0],
     938             :                 Anum_pg_class_relnamespace,
     939             :                 BTEqualStrategyNumber, F_OIDEQ,
     940             :                 schemaid);
     941             : 
     942             :     /* get all the relations present in the specified schema */
     943         460 :     scan = table_beginscan_catalog(classRel, 1, key);
     944       27404 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     945             :     {
     946       26944 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     947       26944 :         Oid         relid = relForm->oid;
     948             :         char        relkind;
     949             : 
     950       26944 :         if (!is_publishable_class(relid, relForm))
     951       11108 :             continue;
     952             : 
     953       15836 :         relkind = get_rel_relkind(relid);
     954       15836 :         if (relkind == RELKIND_RELATION)
     955       15198 :             result = lappend_oid(result, relid);
     956         638 :         else if (relkind == RELKIND_PARTITIONED_TABLE)
     957             :         {
     958         638 :             List       *partitionrels = NIL;
     959             : 
     960             :             /*
     961             :              * It is quite possible that some of the partitions are in a
     962             :              * different schema than the parent table, so we need to get such
     963             :              * partitions separately.
     964             :              */
     965         638 :             partitionrels = GetPubPartitionOptionRelations(partitionrels,
     966             :                                                            pub_partopt,
     967             :                                                            relForm->oid);
     968         638 :             result = list_concat_unique_oid(result, partitionrels);
     969             :         }
     970             :     }
     971             : 
     972         460 :     table_endscan(scan);
     973         460 :     table_close(classRel, AccessShareLock);
     974         460 :     return result;
     975             : }
     976             : 
     977             : /*
     978             :  * Gets the list of all relations published by FOR TABLES IN SCHEMA
     979             :  * publication.
     980             :  */
     981             : List *
     982        1612 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     983             : {
     984        1612 :     List       *result = NIL;
     985        1612 :     List       *pubschemalist = GetPublicationSchemas(pubid);
     986             :     ListCell   *cell;
     987             : 
     988        1682 :     foreach(cell, pubschemalist)
     989             :     {
     990          70 :         Oid         schemaid = lfirst_oid(cell);
     991          70 :         List       *schemaRels = NIL;
     992             : 
     993          70 :         schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
     994          70 :         result = list_concat(result, schemaRels);
     995             :     }
     996             : 
     997        1612 :     return result;
     998             : }
     999             : 
    1000             : /*
    1001             :  * Get publication using oid
    1002             :  *
    1003             :  * The Publication struct and its data are palloc'ed here.
    1004             :  */
    1005             : Publication *
    1006        7190 : GetPublication(Oid pubid)
    1007             : {
    1008             :     HeapTuple   tup;
    1009             :     Publication *pub;
    1010             :     Form_pg_publication pubform;
    1011             : 
    1012        7190 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1013        7190 :     if (!HeapTupleIsValid(tup))
    1014           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1015             : 
    1016        7190 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1017             : 
    1018        7190 :     pub = (Publication *) palloc(sizeof(Publication));
    1019        7190 :     pub->oid = pubid;
    1020        7190 :     pub->name = pstrdup(NameStr(pubform->pubname));
    1021        7190 :     pub->alltables = pubform->puballtables;
    1022        7190 :     pub->pubactions.pubinsert = pubform->pubinsert;
    1023        7190 :     pub->pubactions.pubupdate = pubform->pubupdate;
    1024        7190 :     pub->pubactions.pubdelete = pubform->pubdelete;
    1025        7190 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
    1026        7190 :     pub->pubviaroot = pubform->pubviaroot;
    1027             : 
    1028        7190 :     ReleaseSysCache(tup);
    1029             : 
    1030        7190 :     return pub;
    1031             : }
    1032             : 
    1033             : /*
    1034             :  * Get Publication using name.
    1035             :  */
    1036             : Publication *
    1037        2208 : GetPublicationByName(const char *pubname, bool missing_ok)
    1038             : {
    1039             :     Oid         oid;
    1040             : 
    1041        2208 :     oid = get_publication_oid(pubname, missing_ok);
    1042             : 
    1043        2204 :     return OidIsValid(oid) ? GetPublication(oid) : NULL;
    1044             : }
    1045             : 
    1046             : /*
    1047             :  * Get information of the tables in the given publication array.
    1048             :  *
    1049             :  * Returns pubid, relid, column list, row filter for each table.
    1050             :  */
    1051             : Datum
    1052        5476 : pg_get_publication_tables(PG_FUNCTION_ARGS)
    1053             : {
    1054             : #define NUM_PUBLICATION_TABLES_ELEM 4
    1055             :     FuncCallContext *funcctx;
    1056        5476 :     List       *table_infos = NIL;
    1057             : 
    1058             :     /* stuff done only on the first call of the function */
    1059        5476 :     if (SRF_IS_FIRSTCALL())
    1060             :     {
    1061             :         TupleDesc   tupdesc;
    1062             :         MemoryContext oldcontext;
    1063             :         ArrayType  *arr;
    1064             :         Datum      *elems;
    1065             :         int         nelems,
    1066             :                     i;
    1067        1720 :         bool        viaroot = false;
    1068             : 
    1069             :         /* create a function context for cross-call persistence */
    1070        1720 :         funcctx = SRF_FIRSTCALL_INIT();
    1071             : 
    1072             :         /* switch to memory context appropriate for multiple function calls */
    1073        1720 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1074             : 
    1075             :         /*
    1076             :          * Deconstruct the parameter into elements where each element is a
    1077             :          * publication name.
    1078             :          */
    1079        1720 :         arr = PG_GETARG_ARRAYTYPE_P(0);
    1080        1720 :         deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
    1081             :                           &elems, NULL, &nelems);
    1082             : 
    1083             :         /* Get Oids of tables from each publication. */
    1084        3526 :         for (i = 0; i < nelems; i++)
    1085             :         {
    1086             :             Publication *pub_elem;
    1087        1806 :             List       *pub_elem_tables = NIL;
    1088             :             ListCell   *lc;
    1089             : 
    1090        1806 :             pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
    1091             : 
    1092             :             /*
    1093             :              * Publications support partitioned tables. If
    1094             :              * publish_via_partition_root is false, all changes are replicated
    1095             :              * using leaf partition identity and schema, so we only need
    1096             :              * those. Otherwise, get the partitioned table itself.
    1097             :              */
    1098        1806 :             if (pub_elem->alltables)
    1099         284 :                 pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
    1100             :             else
    1101             :             {
    1102             :                 List       *relids,
    1103             :                            *schemarelids;
    1104             : 
    1105        1522 :                 relids = GetPublicationRelations(pub_elem->oid,
    1106        1522 :                                                  pub_elem->pubviaroot ?
    1107        1522 :                                                  PUBLICATION_PART_ROOT :
    1108             :                                                  PUBLICATION_PART_LEAF);
    1109        1522 :                 schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
    1110        1522 :                                                                 pub_elem->pubviaroot ?
    1111        1522 :                                                                 PUBLICATION_PART_ROOT :
    1112             :                                                                 PUBLICATION_PART_LEAF);
    1113        1522 :                 pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
    1114             :             }
    1115             : 
    1116             :             /*
    1117             :              * Record the published table and the corresponding publication so
    1118             :              * that we can get row filters and column lists later.
    1119             :              *
    1120             :              * When a table is published by multiple publications, to obtain
    1121             :              * all row filters and column lists, the structure related to this
    1122             :              * table will be recorded multiple times.
    1123             :              */
    1124        5642 :             foreach(lc, pub_elem_tables)
    1125             :             {
    1126        3836 :                 published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
    1127             : 
    1128        3836 :                 table_info->relid = lfirst_oid(lc);
    1129        3836 :                 table_info->pubid = pub_elem->oid;
    1130        3836 :                 table_infos = lappend(table_infos, table_info);
    1131             :             }
    1132             : 
    1133             :             /* At least one publication is using publish_via_partition_root. */
    1134        1806 :             if (pub_elem->pubviaroot)
    1135         352 :                 viaroot = true;
    1136             :         }
    1137             : 
    1138             :         /*
    1139             :          * If the publication publishes partition changes via their respective
    1140             :          * root partitioned tables, we must exclude partitions in favor of
    1141             :          * including the root partitioned tables. Otherwise, the function
    1142             :          * could return both the child and parent tables which could cause
    1143             :          * data of the child table to be double-published on the subscriber
    1144             :          * side.
    1145             :          */
    1146        1720 :         if (viaroot)
    1147         336 :             filter_partitions(table_infos);
    1148             : 
    1149             :         /* Construct a tuple descriptor for the result rows. */
    1150        1720 :         tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
    1151        1720 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
    1152             :                            OIDOID, -1, 0);
    1153        1720 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
    1154             :                            OIDOID, -1, 0);
    1155        1720 :         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
    1156             :                            INT2VECTOROID, -1, 0);
    1157        1720 :         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
    1158             :                            PG_NODE_TREEOID, -1, 0);
    1159             : 
    1160        1720 :         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
    1161        1720 :         funcctx->user_fctx = (void *) table_infos;
    1162             : 
    1163        1720 :         MemoryContextSwitchTo(oldcontext);
    1164             :     }
    1165             : 
    1166             :     /* stuff done on every call of the function */
    1167        5476 :     funcctx = SRF_PERCALL_SETUP();
    1168        5476 :     table_infos = (List *) funcctx->user_fctx;
    1169             : 
    1170        5476 :     if (funcctx->call_cntr < list_length(table_infos))
    1171             :     {
    1172        3756 :         HeapTuple   pubtuple = NULL;
    1173             :         HeapTuple   rettuple;
    1174             :         Publication *pub;
    1175        3756 :         published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
    1176        3756 :         Oid         relid = table_info->relid;
    1177        3756 :         Oid         schemaid = get_rel_namespace(relid);
    1178        3756 :         Datum       values[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1179        3756 :         bool        nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1180             : 
    1181             :         /*
    1182             :          * Form tuple with appropriate data.
    1183             :          */
    1184             : 
    1185        3756 :         pub = GetPublication(table_info->pubid);
    1186             : 
    1187        3756 :         values[0] = ObjectIdGetDatum(pub->oid);
    1188        3756 :         values[1] = ObjectIdGetDatum(relid);
    1189             : 
    1190             :         /*
    1191             :          * We don't consider row filters or column lists for FOR ALL TABLES or
    1192             :          * FOR TABLES IN SCHEMA publications.
    1193             :          */
    1194        3756 :         if (!pub->alltables &&
    1195        2416 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
    1196             :                                    ObjectIdGetDatum(schemaid),
    1197             :                                    ObjectIdGetDatum(pub->oid)))
    1198        2322 :             pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
    1199             :                                            ObjectIdGetDatum(relid),
    1200             :                                            ObjectIdGetDatum(pub->oid));
    1201             : 
    1202        3756 :         if (HeapTupleIsValid(pubtuple))
    1203             :         {
    1204             :             /* Lookup the column list attribute. */
    1205        2104 :             values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1206             :                                         Anum_pg_publication_rel_prattrs,
    1207             :                                         &(nulls[2]));
    1208             : 
    1209             :             /* Null indicates no filter. */
    1210        2104 :             values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1211             :                                         Anum_pg_publication_rel_prqual,
    1212             :                                         &(nulls[3]));
    1213             :         }
    1214             :         else
    1215             :         {
    1216        1652 :             nulls[2] = true;
    1217        1652 :             nulls[3] = true;
    1218             :         }
    1219             : 
    1220             :         /* Show all columns when the column list is not specified. */
    1221        3756 :         if (nulls[2])
    1222             :         {
    1223        3522 :             Relation    rel = table_open(relid, AccessShareLock);
    1224        3522 :             int         nattnums = 0;
    1225             :             int16      *attnums;
    1226        3522 :             TupleDesc   desc = RelationGetDescr(rel);
    1227             :             int         i;
    1228             : 
    1229        3522 :             attnums = (int16 *) palloc(desc->natts * sizeof(int16));
    1230             : 
    1231        9424 :             for (i = 0; i < desc->natts; i++)
    1232             :             {
    1233        5902 :                 Form_pg_attribute att = TupleDescAttr(desc, i);
    1234             : 
    1235        5902 :                 if (att->attisdropped || att->attgenerated)
    1236          34 :                     continue;
    1237             : 
    1238        5868 :                 attnums[nattnums++] = att->attnum;
    1239             :             }
    1240             : 
    1241        3522 :             if (nattnums > 0)
    1242             :             {
    1243        3478 :                 values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
    1244        3478 :                 nulls[2] = false;
    1245             :             }
    1246             : 
    1247        3522 :             table_close(rel, AccessShareLock);
    1248             :         }
    1249             : 
    1250        3756 :         rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
    1251             : 
    1252        3756 :         SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
    1253             :     }
    1254             : 
    1255        1720 :     SRF_RETURN_DONE(funcctx);
    1256             : }

Generated by: LCOV version 1.14