LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16beta1 Lines: 410 415 98.8 %
Date: 2023-05-30 16:15:03 Functions: 26 26 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_publication.c
       4             :  *      publication C API manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_publication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "access/xact.h"
      22             : #include "catalog/catalog.h"
      23             : #include "catalog/dependency.h"
      24             : #include "catalog/index.h"
      25             : #include "catalog/indexing.h"
      26             : #include "catalog/namespace.h"
      27             : #include "catalog/partition.h"
      28             : #include "catalog/objectaccess.h"
      29             : #include "catalog/objectaddress.h"
      30             : #include "catalog/pg_inherits.h"
      31             : #include "catalog/pg_namespace.h"
      32             : #include "catalog/pg_publication.h"
      33             : #include "catalog/pg_publication_namespace.h"
      34             : #include "catalog/pg_publication_rel.h"
      35             : #include "catalog/pg_type.h"
      36             : #include "commands/publicationcmds.h"
      37             : #include "funcapi.h"
      38             : #include "miscadmin.h"
      39             : #include "utils/array.h"
      40             : #include "utils/builtins.h"
      41             : #include "utils/catcache.h"
      42             : #include "utils/fmgroids.h"
      43             : #include "utils/inval.h"
      44             : #include "utils/lsyscache.h"
      45             : #include "utils/rel.h"
      46             : #include "utils/syscache.h"
      47             : 
      48             : /* Records association between publication and published table */
      49             : typedef struct
      50             : {
      51             :     Oid         relid;          /* OID of published table */
      52             :     Oid         pubid;          /* OID of publication that publishes this
      53             :                                  * table. */
      54             : } published_rel;
      55             : 
      56             : static void publication_translate_columns(Relation targetrel, List *columns,
      57             :                                           int *natts, AttrNumber **attrs);
      58             : 
      59             : /*
      60             :  * Check if relation can be in given publication and throws appropriate
      61             :  * error if not.
      62             :  */
      63             : static void
      64         966 : check_publication_add_relation(Relation targetrel)
      65             : {
      66             :     /* Must be a regular or partitioned table */
      67         966 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
      68         142 :         RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
      69          14 :         ereport(ERROR,
      70             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      71             :                  errmsg("cannot add relation \"%s\" to publication",
      72             :                         RelationGetRelationName(targetrel)),
      73             :                  errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
      74             : 
      75             :     /* Can't be system table */
      76         952 :     if (IsCatalogRelation(targetrel))
      77           6 :         ereport(ERROR,
      78             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      79             :                  errmsg("cannot add relation \"%s\" to publication",
      80             :                         RelationGetRelationName(targetrel)),
      81             :                  errdetail("This operation is not supported for system tables.")));
      82             : 
      83             :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      84         946 :     if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
      85           6 :         ereport(ERROR,
      86             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      87             :                  errmsg("cannot add relation \"%s\" to publication",
      88             :                         RelationGetRelationName(targetrel)),
      89             :                  errdetail("This operation is not supported for temporary tables.")));
      90         940 :     else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
      91           6 :         ereport(ERROR,
      92             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      93             :                  errmsg("cannot add relation \"%s\" to publication",
      94             :                         RelationGetRelationName(targetrel)),
      95             :                  errdetail("This operation is not supported for unlogged tables.")));
      96         934 : }
      97             : 
      98             : /*
      99             :  * Check if schema can be in given publication and throw appropriate error if
     100             :  * not.
     101             :  */
     102             : static void
     103         204 : check_publication_add_schema(Oid schemaid)
     104             : {
     105             :     /* Can't be system namespace */
     106         204 :     if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
     107           6 :         ereport(ERROR,
     108             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     109             :                  errmsg("cannot add schema \"%s\" to publication",
     110             :                         get_namespace_name(schemaid)),
     111             :                  errdetail("This operation is not supported for system schemas.")));
     112             : 
     113             :     /* Can't be temporary namespace */
     114         198 :     if (isAnyTempNamespace(schemaid))
     115           0 :         ereport(ERROR,
     116             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     117             :                  errmsg("cannot add schema \"%s\" to publication",
     118             :                         get_namespace_name(schemaid)),
     119             :                  errdetail("Temporary schemas cannot be replicated.")));
     120         198 : }
     121             : 
     122             : /*
     123             :  * Returns if relation represented by oid and Form_pg_class entry
     124             :  * is publishable.
     125             :  *
     126             :  * Does same checks as check_publication_add_relation() above, but does not
     127             :  * need relation to be opened and also does not throw errors.
     128             :  *
     129             :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
     130             :  * ie all tables created during initdb.  This mainly affects the preinstalled
     131             :  * information_schema.  IsCatalogRelationOid() only excludes tables with
     132             :  * relid < FirstUnpinnedObjectId, making that test rather redundant,
     133             :  * but really we should get rid of the FirstNormalObjectId test not
     134             :  * IsCatalogRelationOid.  We can't do so today because we don't want
     135             :  * information_schema tables to be considered publishable; but this test
     136             :  * is really inadequate for that, since the information_schema could be
     137             :  * dropped and reloaded and then it'll be considered publishable.  The best
     138             :  * long-term solution may be to add a "relispublishable" bool to pg_class,
     139             :  * and depend on that instead of OID checks.
     140             :  */
     141             : static bool
     142      586290 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     143             : {
     144      597126 :     return (reltuple->relkind == RELKIND_RELATION ||
     145       10836 :             reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
     146      576598 :         !IsCatalogRelationOid(relid) &&
     147     1172580 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     148             :         relid >= FirstNormalObjectId;
     149             : }
     150             : 
     151             : /*
     152             :  * Another variant of is_publishable_class(), taking a Relation.
     153             :  */
     154             : bool
     155      540164 : is_publishable_relation(Relation rel)
     156             : {
     157      540164 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     158             : }
     159             : 
     160             : /*
     161             :  * SQL-callable variant of the above
     162             :  *
     163             :  * This returns null when the relation does not exist.  This is intended to be
     164             :  * used for example in psql to avoid gratuitous errors when there are
     165             :  * concurrent catalog changes.
     166             :  */
     167             : Datum
     168        4180 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     169             : {
     170        4180 :     Oid         relid = PG_GETARG_OID(0);
     171             :     HeapTuple   tuple;
     172             :     bool        result;
     173             : 
     174        4180 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     175        4180 :     if (!HeapTupleIsValid(tuple))
     176           0 :         PG_RETURN_NULL();
     177        4180 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     178        4180 :     ReleaseSysCache(tuple);
     179        4180 :     PG_RETURN_BOOL(result);
     180             : }
     181             : 
     182             : /*
     183             :  * Returns true if the ancestor is in the list of published relations.
     184             :  * Otherwise, returns false.
     185             :  */
     186             : static bool
     187         198 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
     188             : {
     189             :     ListCell   *lc;
     190             : 
     191         668 :     foreach(lc, table_infos)
     192             :     {
     193         550 :         Oid         relid = ((published_rel *) lfirst(lc))->relid;
     194             : 
     195         550 :         if (relid == ancestor)
     196          80 :             return true;
     197             :     }
     198             : 
     199         118 :     return false;
     200             : }
     201             : 
     202             : /*
     203             :  * Filter out the partitions whose parent tables are also present in the list.
     204             :  */
     205             : static void
     206         320 : filter_partitions(List *table_infos)
     207             : {
     208             :     ListCell   *lc;
     209             : 
     210         956 :     foreach(lc, table_infos)
     211             :     {
     212         636 :         bool        skip = false;
     213         636 :         List       *ancestors = NIL;
     214             :         ListCell   *lc2;
     215         636 :         published_rel *table_info = (published_rel *) lfirst(lc);
     216             : 
     217         636 :         if (get_rel_relispartition(table_info->relid))
     218         198 :             ancestors = get_partition_ancestors(table_info->relid);
     219             : 
     220         754 :         foreach(lc2, ancestors)
     221             :         {
     222         198 :             Oid         ancestor = lfirst_oid(lc2);
     223             : 
     224         198 :             if (is_ancestor_member_tableinfos(ancestor, table_infos))
     225             :             {
     226          80 :                 skip = true;
     227          80 :                 break;
     228             :             }
     229             :         }
     230             : 
     231         636 :         if (skip)
     232          80 :             table_infos = foreach_delete_current(table_infos, lc);
     233             :     }
     234         320 : }
     235             : 
     236             : /*
     237             :  * Returns true if any schema is associated with the publication, false if no
     238             :  * schema is associated with the publication.
     239             :  */
     240             : bool
     241         250 : is_schema_publication(Oid pubid)
     242             : {
     243             :     Relation    pubschsrel;
     244             :     ScanKeyData scankey;
     245             :     SysScanDesc scan;
     246             :     HeapTuple   tup;
     247         250 :     bool        result = false;
     248             : 
     249         250 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     250         250 :     ScanKeyInit(&scankey,
     251             :                 Anum_pg_publication_namespace_pnpubid,
     252             :                 BTEqualStrategyNumber, F_OIDEQ,
     253             :                 ObjectIdGetDatum(pubid));
     254             : 
     255         250 :     scan = systable_beginscan(pubschsrel,
     256             :                               PublicationNamespacePnnspidPnpubidIndexId,
     257             :                               true, NULL, 1, &scankey);
     258         250 :     tup = systable_getnext(scan);
     259         250 :     result = HeapTupleIsValid(tup);
     260             : 
     261         250 :     systable_endscan(scan);
     262         250 :     table_close(pubschsrel, AccessShareLock);
     263             : 
     264         250 :     return result;
     265             : }
     266             : 
     267             : /*
     268             :  * Gets the relations based on the publication partition option for a specified
     269             :  * relation.
     270             :  */
     271             : List *
     272        4938 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
     273             :                                Oid relid)
     274             : {
     275        4938 :     if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
     276             :         pub_partopt != PUBLICATION_PART_ROOT)
     277         934 :     {
     278         934 :         List       *all_parts = find_all_inheritors(relid, NoLock,
     279             :                                                     NULL);
     280             : 
     281         934 :         if (pub_partopt == PUBLICATION_PART_ALL)
     282         740 :             result = list_concat(result, all_parts);
     283         194 :         else if (pub_partopt == PUBLICATION_PART_LEAF)
     284             :         {
     285             :             ListCell   *lc;
     286             : 
     287         710 :             foreach(lc, all_parts)
     288             :             {
     289         516 :                 Oid         partOid = lfirst_oid(lc);
     290             : 
     291         516 :                 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
     292         322 :                     result = lappend_oid(result, partOid);
     293             :             }
     294             :         }
     295             :         else
     296             :             Assert(false);
     297             :     }
     298             :     else
     299        4004 :         result = lappend_oid(result, relid);
     300             : 
     301        4938 :     return result;
     302             : }
     303             : 
     304             : /*
     305             :  * Returns the relid of the topmost ancestor that is published via this
     306             :  * publication if any and set its ancestor level to ancestor_level,
     307             :  * otherwise returns InvalidOid.
     308             :  *
     309             :  * The ancestor_level value allows us to compare the results for multiple
     310             :  * publications, and decide which value is higher up.
     311             :  *
     312             :  * Note that the list of ancestors should be ordered such that the topmost
     313             :  * ancestor is at the end of the list.
     314             :  */
     315             : Oid
     316         432 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
     317             : {
     318             :     ListCell   *lc;
     319         432 :     Oid         topmost_relid = InvalidOid;
     320         432 :     int         level = 0;
     321             : 
     322             :     /*
     323             :      * Find the "topmost" ancestor that is in this publication.
     324             :      */
     325         878 :     foreach(lc, ancestors)
     326             :     {
     327         446 :         Oid         ancestor = lfirst_oid(lc);
     328         446 :         List       *apubids = GetRelationPublications(ancestor);
     329         446 :         List       *aschemaPubids = NIL;
     330             : 
     331         446 :         level++;
     332             : 
     333         446 :         if (list_member_oid(apubids, puboid))
     334             :         {
     335         292 :             topmost_relid = ancestor;
     336             : 
     337         292 :             if (ancestor_level)
     338          70 :                 *ancestor_level = level;
     339             :         }
     340             :         else
     341             :         {
     342         154 :             aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
     343         154 :             if (list_member_oid(aschemaPubids, puboid))
     344             :             {
     345          10 :                 topmost_relid = ancestor;
     346             : 
     347          10 :                 if (ancestor_level)
     348          10 :                     *ancestor_level = level;
     349             :             }
     350             :         }
     351             : 
     352         446 :         list_free(apubids);
     353         446 :         list_free(aschemaPubids);
     354             :     }
     355             : 
     356         432 :     return topmost_relid;
     357             : }
     358             : 
     359             : /*
     360             :  * Insert new publication / relation mapping.
     361             :  */
     362             : ObjectAddress
     363         988 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
     364             :                          bool if_not_exists)
     365             : {
     366             :     Relation    rel;
     367             :     HeapTuple   tup;
     368             :     Datum       values[Natts_pg_publication_rel];
     369             :     bool        nulls[Natts_pg_publication_rel];
     370         988 :     Relation    targetrel = pri->relation;
     371         988 :     Oid         relid = RelationGetRelid(targetrel);
     372             :     Oid         pubreloid;
     373         988 :     Publication *pub = GetPublication(pubid);
     374         988 :     AttrNumber *attarray = NULL;
     375         988 :     int         natts = 0;
     376             :     ObjectAddress myself,
     377             :                 referenced;
     378         988 :     List       *relids = NIL;
     379             : 
     380         988 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     381             : 
     382             :     /*
     383             :      * Check for duplicates. Note that this does not really prevent
     384             :      * duplicates, it's here just to provide nicer error message in common
     385             :      * case. The real protection is the unique key on the catalog.
     386             :      */
     387         988 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     388             :                               ObjectIdGetDatum(pubid)))
     389             :     {
     390          22 :         table_close(rel, RowExclusiveLock);
     391             : 
     392          22 :         if (if_not_exists)
     393          16 :             return InvalidObjectAddress;
     394             : 
     395           6 :         ereport(ERROR,
     396             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     397             :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     398             :                         RelationGetRelationName(targetrel), pub->name)));
     399             :     }
     400             : 
     401         966 :     check_publication_add_relation(targetrel);
     402             : 
     403             :     /*
     404             :      * Translate column names to attnums and make sure the column list
     405             :      * contains only allowed elements (no system or generated columns etc.).
     406             :      * Also build an array of attnums, for storing in the catalog.
     407             :      */
     408         934 :     publication_translate_columns(pri->relation, pri->columns,
     409             :                                   &natts, &attarray);
     410             : 
     411             :     /* Form a tuple. */
     412         916 :     memset(values, 0, sizeof(values));
     413         916 :     memset(nulls, false, sizeof(nulls));
     414             : 
     415         916 :     pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     416             :                                    Anum_pg_publication_rel_oid);
     417         916 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
     418         916 :     values[Anum_pg_publication_rel_prpubid - 1] =
     419         916 :         ObjectIdGetDatum(pubid);
     420         916 :     values[Anum_pg_publication_rel_prrelid - 1] =
     421         916 :         ObjectIdGetDatum(relid);
     422             : 
     423             :     /* Add qualifications, if available */
     424         916 :     if (pri->whereClause != NULL)
     425         298 :         values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
     426             :     else
     427         618 :         nulls[Anum_pg_publication_rel_prqual - 1] = true;
     428             : 
     429             :     /* Add column list, if available */
     430         916 :     if (pri->columns)
     431         272 :         values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts));
     432             :     else
     433         644 :         nulls[Anum_pg_publication_rel_prattrs - 1] = true;
     434             : 
     435         916 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     436             : 
     437             :     /* Insert tuple into catalog. */
     438         916 :     CatalogTupleInsert(rel, tup);
     439         916 :     heap_freetuple(tup);
     440             : 
     441             :     /* Register dependencies as needed */
     442         916 :     ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
     443             : 
     444             :     /* Add dependency on the publication */
     445         916 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     446         916 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     447             : 
     448             :     /* Add dependency on the relation */
     449         916 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     450         916 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     451             : 
     452             :     /* Add dependency on the objects mentioned in the qualifications */
     453         916 :     if (pri->whereClause)
     454         298 :         recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
     455             :                                         DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
     456             :                                         false);
     457             : 
     458             :     /* Add dependency on the columns, if any are listed */
     459        1392 :     for (int i = 0; i < natts; i++)
     460             :     {
     461         476 :         ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
     462         476 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     463             :     }
     464             : 
     465             :     /* Close the table. */
     466         916 :     table_close(rel, RowExclusiveLock);
     467             : 
     468             :     /*
     469             :      * Invalidate relcache so that publication info is rebuilt.
     470             :      *
     471             :      * For the partitioned tables, we must invalidate all partitions contained
     472             :      * in the respective partition hierarchies, not just the one explicitly
     473             :      * mentioned in the publication. This is required because we implicitly
     474             :      * publish the child tables when the parent table is published.
     475             :      */
     476         916 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
     477             :                                             relid);
     478             : 
     479         916 :     InvalidatePublicationRels(relids);
     480             : 
     481         916 :     return myself;
     482             : }
     483             : 
     484             : /* qsort comparator for attnums */
     485             : static int
     486         204 : compare_int16(const void *a, const void *b)
     487             : {
     488         204 :     int         av = *(const int16 *) a;
     489         204 :     int         bv = *(const int16 *) b;
     490             : 
     491             :     /* this can't overflow if int is wider than int16 */
     492         204 :     return (av - bv);
     493             : }
     494             : 
     495             : /*
     496             :  * Translate a list of column names to an array of attribute numbers
     497             :  * and a Bitmapset with them; verify that each attribute is appropriate
     498             :  * to have in a publication column list (no system or generated attributes,
     499             :  * no duplicates).  Additional checks with replica identity are done later;
     500             :  * see pub_collist_contains_invalid_column.
     501             :  *
     502             :  * Note that the attribute numbers are *not* offset by
     503             :  * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
     504             :  * is okay.
     505             :  */
     506             : static void
     507         934 : publication_translate_columns(Relation targetrel, List *columns,
     508             :                               int *natts, AttrNumber **attrs)
     509             : {
     510         934 :     AttrNumber *attarray = NULL;
     511         934 :     Bitmapset  *set = NULL;
     512             :     ListCell   *lc;
     513         934 :     int         n = 0;
     514         934 :     TupleDesc   tupdesc = RelationGetDescr(targetrel);
     515             : 
     516             :     /* Bail out when no column list defined. */
     517         934 :     if (!columns)
     518         644 :         return;
     519             : 
     520             :     /*
     521             :      * Translate list of columns to attnums. We prohibit system attributes and
     522             :      * make sure there are no duplicate columns.
     523             :      */
     524         290 :     attarray = palloc(sizeof(AttrNumber) * list_length(columns));
     525         784 :     foreach(lc, columns)
     526             :     {
     527         512 :         char       *colname = strVal(lfirst(lc));
     528         512 :         AttrNumber  attnum = get_attnum(RelationGetRelid(targetrel), colname);
     529             : 
     530         512 :         if (attnum == InvalidAttrNumber)
     531           6 :             ereport(ERROR,
     532             :                     errcode(ERRCODE_UNDEFINED_COLUMN),
     533             :                     errmsg("column \"%s\" of relation \"%s\" does not exist",
     534             :                            colname, RelationGetRelationName(targetrel)));
     535             : 
     536         506 :         if (!AttrNumberIsForUserDefinedAttr(attnum))
     537           6 :             ereport(ERROR,
     538             :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     539             :                     errmsg("cannot use system column \"%s\" in publication column list",
     540             :                            colname));
     541             : 
     542         500 :         if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
     543           6 :             ereport(ERROR,
     544             :                     errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     545             :                     errmsg("cannot use generated column \"%s\" in publication column list",
     546             :                            colname));
     547             : 
     548         494 :         if (bms_is_member(attnum, set))
     549           0 :             ereport(ERROR,
     550             :                     errcode(ERRCODE_DUPLICATE_OBJECT),
     551             :                     errmsg("duplicate column \"%s\" in publication column list",
     552             :                            colname));
     553             : 
     554         494 :         set = bms_add_member(set, attnum);
     555         494 :         attarray[n++] = attnum;
     556             :     }
     557             : 
     558             :     /* Be tidy, so that the catalog representation is always sorted */
     559         272 :     qsort(attarray, n, sizeof(AttrNumber), compare_int16);
     560             : 
     561         272 :     *natts = n;
     562         272 :     *attrs = attarray;
     563             : 
     564         272 :     bms_free(set);
     565             : }
     566             : 
     567             : /*
     568             :  * Transform a column list (represented by an array Datum) to a bitmapset.
     569             :  *
     570             :  * If columns isn't NULL, add the column numbers to that set.
     571             :  *
     572             :  * If mcxt isn't NULL, build the bitmapset in that context.
     573             :  */
     574             : Bitmapset *
     575         428 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
     576             : {
     577         428 :     Bitmapset  *result = NULL;
     578             :     ArrayType  *arr;
     579             :     int         nelems;
     580             :     int16      *elems;
     581         428 :     MemoryContext oldcxt = NULL;
     582             : 
     583             :     /*
     584             :      * If an existing bitmap was provided, use it. Otherwise just use NULL and
     585             :      * build a new bitmap.
     586             :      */
     587         428 :     if (columns)
     588           0 :         result = columns;
     589             : 
     590         428 :     arr = DatumGetArrayTypeP(pubcols);
     591         428 :     nelems = ARR_DIMS(arr)[0];
     592         428 :     elems = (int16 *) ARR_DATA_PTR(arr);
     593             : 
     594             :     /* If a memory context was specified, switch to it. */
     595         428 :     if (mcxt)
     596          74 :         oldcxt = MemoryContextSwitchTo(mcxt);
     597             : 
     598        1188 :     for (int i = 0; i < nelems; i++)
     599         760 :         result = bms_add_member(result, elems[i]);
     600             : 
     601         428 :     if (mcxt)
     602          74 :         MemoryContextSwitchTo(oldcxt);
     603             : 
     604         428 :     return result;
     605             : }
     606             : 
     607             : /*
     608             :  * Insert new publication / schema mapping.
     609             :  */
     610             : ObjectAddress
     611         222 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
     612             : {
     613             :     Relation    rel;
     614             :     HeapTuple   tup;
     615             :     Datum       values[Natts_pg_publication_namespace];
     616             :     bool        nulls[Natts_pg_publication_namespace];
     617             :     Oid         psschid;
     618         222 :     Publication *pub = GetPublication(pubid);
     619         222 :     List       *schemaRels = NIL;
     620             :     ObjectAddress myself,
     621             :                 referenced;
     622             : 
     623         222 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
     624             : 
     625             :     /*
     626             :      * Check for duplicates. Note that this does not really prevent
     627             :      * duplicates, it's here just to provide nicer error message in common
     628             :      * case. The real protection is the unique key on the catalog.
     629             :      */
     630         222 :     if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     631             :                               ObjectIdGetDatum(schemaid),
     632             :                               ObjectIdGetDatum(pubid)))
     633             :     {
     634          18 :         table_close(rel, RowExclusiveLock);
     635             : 
     636          18 :         if (if_not_exists)
     637          12 :             return InvalidObjectAddress;
     638             : 
     639           6 :         ereport(ERROR,
     640             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     641             :                  errmsg("schema \"%s\" is already member of publication \"%s\"",
     642             :                         get_namespace_name(schemaid), pub->name)));
     643             :     }
     644             : 
     645         204 :     check_publication_add_schema(schemaid);
     646             : 
     647             :     /* Form a tuple */
     648         198 :     memset(values, 0, sizeof(values));
     649         198 :     memset(nulls, false, sizeof(nulls));
     650             : 
     651         198 :     psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
     652             :                                  Anum_pg_publication_namespace_oid);
     653         198 :     values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
     654         198 :     values[Anum_pg_publication_namespace_pnpubid - 1] =
     655         198 :         ObjectIdGetDatum(pubid);
     656         198 :     values[Anum_pg_publication_namespace_pnnspid - 1] =
     657         198 :         ObjectIdGetDatum(schemaid);
     658             : 
     659         198 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     660             : 
     661             :     /* Insert tuple into catalog */
     662         198 :     CatalogTupleInsert(rel, tup);
     663         198 :     heap_freetuple(tup);
     664             : 
     665         198 :     ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
     666             : 
     667             :     /* Add dependency on the publication */
     668         198 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     669         198 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     670             : 
     671             :     /* Add dependency on the schema */
     672         198 :     ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
     673         198 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     674             : 
     675             :     /* Close the table */
     676         198 :     table_close(rel, RowExclusiveLock);
     677             : 
     678             :     /*
     679             :      * Invalidate relcache so that publication info is rebuilt. See
     680             :      * publication_add_relation for why we need to consider all the
     681             :      * partitions.
     682             :      */
     683         198 :     schemaRels = GetSchemaPublicationRelations(schemaid,
     684             :                                                PUBLICATION_PART_ALL);
     685         198 :     InvalidatePublicationRels(schemaRels);
     686             : 
     687         198 :     return myself;
     688             : }
     689             : 
     690             : /* Gets list of publication oids for a relation */
     691             : List *
     692       10584 : GetRelationPublications(Oid relid)
     693             : {
     694       10584 :     List       *result = NIL;
     695             :     CatCList   *pubrellist;
     696             :     int         i;
     697             : 
     698             :     /* Find all publications associated with the relation. */
     699       10584 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     700             :                                      ObjectIdGetDatum(relid));
     701       11968 :     for (i = 0; i < pubrellist->n_members; i++)
     702             :     {
     703        1384 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     704        1384 :         Oid         pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
     705             : 
     706        1384 :         result = lappend_oid(result, pubid);
     707             :     }
     708             : 
     709       10584 :     ReleaseSysCacheList(pubrellist);
     710             : 
     711       10584 :     return result;
     712             : }
     713             : 
     714             : /*
     715             :  * Gets list of relation oids for a publication.
     716             :  *
     717             :  * This should only be used FOR TABLE publications, the FOR ALL TABLES
     718             :  * should use GetAllTablesPublicationRelations().
     719             :  */
     720             : List *
     721        2014 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     722             : {
     723             :     List       *result;
     724             :     Relation    pubrelsrel;
     725             :     ScanKeyData scankey;
     726             :     SysScanDesc scan;
     727             :     HeapTuple   tup;
     728             : 
     729             :     /* Find all publications associated with the relation. */
     730        2014 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     731             : 
     732        2014 :     ScanKeyInit(&scankey,
     733             :                 Anum_pg_publication_rel_prpubid,
     734             :                 BTEqualStrategyNumber, F_OIDEQ,
     735             :                 ObjectIdGetDatum(pubid));
     736             : 
     737        2014 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
     738             :                               true, NULL, 1, &scankey);
     739             : 
     740        2014 :     result = NIL;
     741        4712 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     742             :     {
     743             :         Form_pg_publication_rel pubrel;
     744             : 
     745        2698 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     746        2698 :         result = GetPubPartitionOptionRelations(result, pub_partopt,
     747             :                                                 pubrel->prrelid);
     748             :     }
     749             : 
     750        2014 :     systable_endscan(scan);
     751        2014 :     table_close(pubrelsrel, AccessShareLock);
     752             : 
     753             :     /* Now sort and de-duplicate the result list */
     754        2014 :     list_sort(result, list_oid_cmp);
     755        2014 :     list_deduplicate_oid(result);
     756             : 
     757        2014 :     return result;
     758             : }
     759             : 
     760             : /*
     761             :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     762             :  */
     763             : List *
     764        7322 : GetAllTablesPublications(void)
     765             : {
     766             :     List       *result;
     767             :     Relation    rel;
     768             :     ScanKeyData scankey;
     769             :     SysScanDesc scan;
     770             :     HeapTuple   tup;
     771             : 
     772             :     /* Find all publications that are marked as for all tables. */
     773        7322 :     rel = table_open(PublicationRelationId, AccessShareLock);
     774             : 
     775        7322 :     ScanKeyInit(&scankey,
     776             :                 Anum_pg_publication_puballtables,
     777             :                 BTEqualStrategyNumber, F_BOOLEQ,
     778             :                 BoolGetDatum(true));
     779             : 
     780        7322 :     scan = systable_beginscan(rel, InvalidOid, false,
     781             :                               NULL, 1, &scankey);
     782             : 
     783        7322 :     result = NIL;
     784        7468 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     785             :     {
     786         146 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
     787             : 
     788         146 :         result = lappend_oid(result, oid);
     789             :     }
     790             : 
     791        7322 :     systable_endscan(scan);
     792        7322 :     table_close(rel, AccessShareLock);
     793             : 
     794        7322 :     return result;
     795             : }
     796             : 
     797             : /*
     798             :  * Gets list of all relation published by FOR ALL TABLES publication(s).
     799             :  *
     800             :  * If the publication publishes partition changes via their respective root
     801             :  * partitioned tables, we must exclude partitions in favor of including the
     802             :  * root partitioned tables.
     803             :  */
     804             : List *
     805         254 : GetAllTablesPublicationRelations(bool pubviaroot)
     806             : {
     807             :     Relation    classRel;
     808             :     ScanKeyData key[1];
     809             :     TableScanDesc scan;
     810             :     HeapTuple   tuple;
     811         254 :     List       *result = NIL;
     812             : 
     813         254 :     classRel = table_open(RelationRelationId, AccessShareLock);
     814             : 
     815         254 :     ScanKeyInit(&key[0],
     816             :                 Anum_pg_class_relkind,
     817             :                 BTEqualStrategyNumber, F_CHAREQ,
     818             :                 CharGetDatum(RELKIND_RELATION));
     819             : 
     820         254 :     scan = table_beginscan_catalog(classRel, 1, key);
     821             : 
     822       18930 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     823             :     {
     824       18676 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     825       18676 :         Oid         relid = relForm->oid;
     826             : 
     827       18676 :         if (is_publishable_class(relid, relForm) &&
     828        1392 :             !(relForm->relispartition && pubviaroot))
     829        1210 :             result = lappend_oid(result, relid);
     830             :     }
     831             : 
     832         254 :     table_endscan(scan);
     833             : 
     834         254 :     if (pubviaroot)
     835             :     {
     836          26 :         ScanKeyInit(&key[0],
     837             :                     Anum_pg_class_relkind,
     838             :                     BTEqualStrategyNumber, F_CHAREQ,
     839             :                     CharGetDatum(RELKIND_PARTITIONED_TABLE));
     840             : 
     841          26 :         scan = table_beginscan_catalog(classRel, 1, key);
     842             : 
     843         156 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     844             :         {
     845         130 :             Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     846         130 :             Oid         relid = relForm->oid;
     847             : 
     848         130 :             if (is_publishable_class(relid, relForm) &&
     849         130 :                 !relForm->relispartition)
     850         104 :                 result = lappend_oid(result, relid);
     851             :         }
     852             : 
     853          26 :         table_endscan(scan);
     854             :     }
     855             : 
     856         254 :     table_close(classRel, AccessShareLock);
     857         254 :     return result;
     858             : }
     859             : 
     860             : /*
     861             :  * Gets the list of schema oids for a publication.
     862             :  *
     863             :  * This should only be used FOR TABLES IN SCHEMA publications.
     864             :  */
     865             : List *
     866        1944 : GetPublicationSchemas(Oid pubid)
     867             : {
     868        1944 :     List       *result = NIL;
     869             :     Relation    pubschsrel;
     870             :     ScanKeyData scankey;
     871             :     SysScanDesc scan;
     872             :     HeapTuple   tup;
     873             : 
     874             :     /* Find all schemas associated with the publication */
     875        1944 :     pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
     876             : 
     877        1944 :     ScanKeyInit(&scankey,
     878             :                 Anum_pg_publication_namespace_pnpubid,
     879             :                 BTEqualStrategyNumber, F_OIDEQ,
     880             :                 ObjectIdGetDatum(pubid));
     881             : 
     882        1944 :     scan = systable_beginscan(pubschsrel,
     883             :                               PublicationNamespacePnnspidPnpubidIndexId,
     884             :                               true, NULL, 1, &scankey);
     885        2044 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     886             :     {
     887             :         Form_pg_publication_namespace pubsch;
     888             : 
     889         100 :         pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
     890             : 
     891         100 :         result = lappend_oid(result, pubsch->pnnspid);
     892             :     }
     893             : 
     894        1944 :     systable_endscan(scan);
     895        1944 :     table_close(pubschsrel, AccessShareLock);
     896             : 
     897        1944 :     return result;
     898             : }
     899             : 
     900             : /*
     901             :  * Gets the list of publication oids associated with a specified schema.
     902             :  */
     903             : List *
     904       10260 : GetSchemaPublications(Oid schemaid)
     905             : {
     906       10260 :     List       *result = NIL;
     907             :     CatCList   *pubschlist;
     908             :     int         i;
     909             : 
     910             :     /* Find all publications associated with the schema */
     911       10260 :     pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
     912             :                                      ObjectIdGetDatum(schemaid));
     913       10374 :     for (i = 0; i < pubschlist->n_members; i++)
     914             :     {
     915         114 :         HeapTuple   tup = &pubschlist->members[i]->tuple;
     916         114 :         Oid         pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
     917             : 
     918         114 :         result = lappend_oid(result, pubid);
     919             :     }
     920             : 
     921       10260 :     ReleaseSysCacheList(pubschlist);
     922             : 
     923       10260 :     return result;
     924             : }
     925             : 
     926             : /*
     927             :  * Get the list of publishable relation oids for a specified schema.
     928             :  */
     929             : List *
     930         460 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
     931             : {
     932             :     Relation    classRel;
     933             :     ScanKeyData key[1];
     934             :     TableScanDesc scan;
     935             :     HeapTuple   tuple;
     936         460 :     List       *result = NIL;
     937             : 
     938             :     Assert(OidIsValid(schemaid));
     939             : 
     940         460 :     classRel = table_open(RelationRelationId, AccessShareLock);
     941             : 
     942         460 :     ScanKeyInit(&key[0],
     943             :                 Anum_pg_class_relnamespace,
     944             :                 BTEqualStrategyNumber, F_OIDEQ,
     945             :                 schemaid);
     946             : 
     947             :     /* get all the relations present in the specified schema */
     948         460 :     scan = table_beginscan_catalog(classRel, 1, key);
     949       23600 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     950             :     {
     951       23140 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     952       23140 :         Oid         relid = relForm->oid;
     953             :         char        relkind;
     954             : 
     955       23140 :         if (!is_publishable_class(relid, relForm))
     956        9232 :             continue;
     957             : 
     958       13908 :         relkind = get_rel_relkind(relid);
     959       13908 :         if (relkind == RELKIND_RELATION)
     960       13368 :             result = lappend_oid(result, relid);
     961         540 :         else if (relkind == RELKIND_PARTITIONED_TABLE)
     962             :         {
     963         540 :             List       *partitionrels = NIL;
     964             : 
     965             :             /*
     966             :              * It is quite possible that some of the partitions are in a
     967             :              * different schema than the parent table, so we need to get such
     968             :              * partitions separately.
     969             :              */
     970         540 :             partitionrels = GetPubPartitionOptionRelations(partitionrels,
     971             :                                                            pub_partopt,
     972             :                                                            relForm->oid);
     973         540 :             result = list_concat_unique_oid(result, partitionrels);
     974             :         }
     975             :     }
     976             : 
     977         460 :     table_endscan(scan);
     978         460 :     table_close(classRel, AccessShareLock);
     979         460 :     return result;
     980             : }
     981             : 
     982             : /*
     983             :  * Gets the list of all relations published by FOR TABLES IN SCHEMA
     984             :  * publication.
     985             :  */
     986             : List *
     987        1552 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     988             : {
     989        1552 :     List       *result = NIL;
     990        1552 :     List       *pubschemalist = GetPublicationSchemas(pubid);
     991             :     ListCell   *cell;
     992             : 
     993        1622 :     foreach(cell, pubschemalist)
     994             :     {
     995          70 :         Oid         schemaid = lfirst_oid(cell);
     996          70 :         List       *schemaRels = NIL;
     997             : 
     998          70 :         schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
     999          70 :         result = list_concat(result, schemaRels);
    1000             :     }
    1001             : 
    1002        1552 :     return result;
    1003             : }
    1004             : 
    1005             : /*
    1006             :  * Get publication using oid
    1007             :  *
    1008             :  * The Publication struct and its data are palloc'ed here.
    1009             :  */
    1010             : Publication *
    1011        6890 : GetPublication(Oid pubid)
    1012             : {
    1013             :     HeapTuple   tup;
    1014             :     Publication *pub;
    1015             :     Form_pg_publication pubform;
    1016             : 
    1017        6890 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1018        6890 :     if (!HeapTupleIsValid(tup))
    1019           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1020             : 
    1021        6890 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1022             : 
    1023        6890 :     pub = (Publication *) palloc(sizeof(Publication));
    1024        6890 :     pub->oid = pubid;
    1025        6890 :     pub->name = pstrdup(NameStr(pubform->pubname));
    1026        6890 :     pub->alltables = pubform->puballtables;
    1027        6890 :     pub->pubactions.pubinsert = pubform->pubinsert;
    1028        6890 :     pub->pubactions.pubupdate = pubform->pubupdate;
    1029        6890 :     pub->pubactions.pubdelete = pubform->pubdelete;
    1030        6890 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
    1031        6890 :     pub->pubviaroot = pubform->pubviaroot;
    1032             : 
    1033        6890 :     ReleaseSysCache(tup);
    1034             : 
    1035        6890 :     return pub;
    1036             : }
    1037             : 
    1038             : /*
    1039             :  * Get Publication using name.
    1040             :  */
    1041             : Publication *
    1042        2086 : GetPublicationByName(const char *pubname, bool missing_ok)
    1043             : {
    1044             :     Oid         oid;
    1045             : 
    1046        2086 :     oid = get_publication_oid(pubname, missing_ok);
    1047             : 
    1048        2084 :     return OidIsValid(oid) ? GetPublication(oid) : NULL;
    1049             : }
    1050             : 
    1051             : /*
    1052             :  * Get information of the tables in the given publication array.
    1053             :  *
    1054             :  * Returns pubid, relid, column list, row filter for each table.
    1055             :  */
    1056             : Datum
    1057        5224 : pg_get_publication_tables(PG_FUNCTION_ARGS)
    1058             : {
    1059             : #define NUM_PUBLICATION_TABLES_ELEM 4
    1060             :     FuncCallContext *funcctx;
    1061        5224 :     List       *table_infos = NIL;
    1062             : 
    1063             :     /* stuff done only on the first call of the function */
    1064        5224 :     if (SRF_IS_FIRSTCALL())
    1065             :     {
    1066             :         TupleDesc   tupdesc;
    1067             :         MemoryContext oldcontext;
    1068             :         ArrayType  *arr;
    1069             :         Datum      *elems;
    1070             :         int         nelems,
    1071             :                     i;
    1072        1628 :         bool        viaroot = false;
    1073             : 
    1074             :         /* create a function context for cross-call persistence */
    1075        1628 :         funcctx = SRF_FIRSTCALL_INIT();
    1076             : 
    1077             :         /* switch to memory context appropriate for multiple function calls */
    1078        1628 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1079             : 
    1080             :         /*
    1081             :          * Deconstruct the parameter into elements where each element is a
    1082             :          * publication name.
    1083             :          */
    1084        1628 :         arr = PG_GETARG_ARRAYTYPE_P(0);
    1085        1628 :         deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
    1086             :                           &elems, NULL, &nelems);
    1087             : 
    1088             :         /* Get Oids of tables from each publication. */
    1089        3344 :         for (i = 0; i < nelems; i++)
    1090             :         {
    1091             :             Publication *pub_elem;
    1092        1716 :             List       *pub_elem_tables = NIL;
    1093             :             ListCell   *lc;
    1094             : 
    1095        1716 :             pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
    1096             : 
    1097             :             /*
    1098             :              * Publications support partitioned tables. If
    1099             :              * publish_via_partition_root is false, all changes are replicated
    1100             :              * using leaf partition identity and schema, so we only need
    1101             :              * those. Otherwise, get the partitioned table itself.
    1102             :              */
    1103        1716 :             if (pub_elem->alltables)
    1104         254 :                 pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
    1105             :             else
    1106             :             {
    1107             :                 List       *relids,
    1108             :                            *schemarelids;
    1109             : 
    1110        1462 :                 relids = GetPublicationRelations(pub_elem->oid,
    1111        1462 :                                                  pub_elem->pubviaroot ?
    1112        1462 :                                                  PUBLICATION_PART_ROOT :
    1113             :                                                  PUBLICATION_PART_LEAF);
    1114        1462 :                 schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
    1115        1462 :                                                                 pub_elem->pubviaroot ?
    1116        1462 :                                                                 PUBLICATION_PART_ROOT :
    1117             :                                                                 PUBLICATION_PART_LEAF);
    1118        1462 :                 pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
    1119             :             }
    1120             : 
    1121             :             /*
    1122             :              * Record the published table and the corresponding publication so
    1123             :              * that we can get row filters and column lists later.
    1124             :              *
    1125             :              * When a table is published by multiple publications, to obtain
    1126             :              * all row filters and column lists, the structure related to this
    1127             :              * table will be recorded multiple times.
    1128             :              */
    1129        5392 :             foreach(lc, pub_elem_tables)
    1130             :             {
    1131        3676 :                 published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
    1132             : 
    1133        3676 :                 table_info->relid = lfirst_oid(lc);
    1134        3676 :                 table_info->pubid = pub_elem->oid;
    1135        3676 :                 table_infos = lappend(table_infos, table_info);
    1136             :             }
    1137             : 
    1138             :             /* At least one publication is using publish_via_partition_root. */
    1139        1716 :             if (pub_elem->pubviaroot)
    1140         336 :                 viaroot = true;
    1141             :         }
    1142             : 
    1143             :         /*
    1144             :          * If the publication publishes partition changes via their respective
    1145             :          * root partitioned tables, we must exclude partitions in favor of
    1146             :          * including the root partitioned tables. Otherwise, the function
    1147             :          * could return both the child and parent tables which could cause
    1148             :          * data of the child table to be double-published on the subscriber
    1149             :          * side.
    1150             :          */
    1151        1628 :         if (viaroot)
    1152         320 :             filter_partitions(table_infos);
    1153             : 
    1154             :         /* Construct a tuple descriptor for the result rows. */
    1155        1628 :         tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
    1156        1628 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
    1157             :                            OIDOID, -1, 0);
    1158        1628 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
    1159             :                            OIDOID, -1, 0);
    1160        1628 :         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
    1161             :                            INT2VECTOROID, -1, 0);
    1162        1628 :         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
    1163             :                            PG_NODE_TREEOID, -1, 0);
    1164             : 
    1165        1628 :         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
    1166        1628 :         funcctx->user_fctx = (void *) table_infos;
    1167             : 
    1168        1628 :         MemoryContextSwitchTo(oldcontext);
    1169             :     }
    1170             : 
    1171             :     /* stuff done on every call of the function */
    1172        5224 :     funcctx = SRF_PERCALL_SETUP();
    1173        5224 :     table_infos = (List *) funcctx->user_fctx;
    1174             : 
    1175        5224 :     if (funcctx->call_cntr < list_length(table_infos))
    1176             :     {
    1177        3596 :         HeapTuple   pubtuple = NULL;
    1178             :         HeapTuple   rettuple;
    1179             :         Publication *pub;
    1180        3596 :         published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
    1181        3596 :         Oid         relid = table_info->relid;
    1182        3596 :         Oid         schemaid = get_rel_namespace(relid);
    1183        3596 :         Datum       values[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1184        3596 :         bool        nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
    1185             : 
    1186             :         /*
    1187             :          * Form tuple with appropriate data.
    1188             :          */
    1189             : 
    1190        3596 :         pub = GetPublication(table_info->pubid);
    1191             : 
    1192        3596 :         values[0] = ObjectIdGetDatum(pub->oid);
    1193        3596 :         values[1] = ObjectIdGetDatum(relid);
    1194             : 
    1195             :         /*
    1196             :          * We don't consider row filters or column lists for FOR ALL TABLES or
    1197             :          * FOR TABLES IN SCHEMA publications.
    1198             :          */
    1199        3596 :         if (!pub->alltables &&
    1200        2282 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
    1201             :                                    ObjectIdGetDatum(schemaid),
    1202             :                                    ObjectIdGetDatum(pub->oid)))
    1203        2188 :             pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
    1204             :                                            ObjectIdGetDatum(relid),
    1205             :                                            ObjectIdGetDatum(pub->oid));
    1206             : 
    1207        3596 :         if (HeapTupleIsValid(pubtuple))
    1208             :         {
    1209             :             /* Lookup the column list attribute. */
    1210        1970 :             values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1211             :                                         Anum_pg_publication_rel_prattrs,
    1212             :                                         &(nulls[2]));
    1213             : 
    1214             :             /* Null indicates no filter. */
    1215        1970 :             values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
    1216             :                                         Anum_pg_publication_rel_prqual,
    1217             :                                         &(nulls[3]));
    1218             :         }
    1219             :         else
    1220             :         {
    1221        1626 :             nulls[2] = true;
    1222        1626 :             nulls[3] = true;
    1223             :         }
    1224             : 
    1225             :         /* Show all columns when the column list is not specified. */
    1226        3596 :         if (nulls[2])
    1227             :         {
    1228        3360 :             Relation    rel = table_open(relid, AccessShareLock);
    1229        3360 :             int         nattnums = 0;
    1230             :             int16      *attnums;
    1231        3360 :             TupleDesc   desc = RelationGetDescr(rel);
    1232             :             int         i;
    1233             : 
    1234        3360 :             attnums = (int16 *) palloc(desc->natts * sizeof(int16));
    1235             : 
    1236        9120 :             for (i = 0; i < desc->natts; i++)
    1237             :             {
    1238        5760 :                 Form_pg_attribute att = TupleDescAttr(desc, i);
    1239             : 
    1240        5760 :                 if (att->attisdropped || att->attgenerated)
    1241          34 :                     continue;
    1242             : 
    1243        5726 :                 attnums[nattnums++] = att->attnum;
    1244             :             }
    1245             : 
    1246        3360 :             if (nattnums > 0)
    1247             :             {
    1248        3360 :                 values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
    1249        3360 :                 nulls[2] = false;
    1250             :             }
    1251             : 
    1252        3360 :             table_close(rel, AccessShareLock);
    1253             :         }
    1254             : 
    1255        3596 :         rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
    1256             : 
    1257        3596 :         SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
    1258             :     }
    1259             : 
    1260        1628 :     SRF_RETURN_DONE(funcctx);
    1261             : }

Generated by: LCOV version 1.14