LCOV - code coverage report
Current view: top level - src/backend/replication/logical - relation.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 186 213 87.3 %
Date: 2020-06-01 00:06:26 Functions: 12 13 92.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * relation.c
       3             :  *     PostgreSQL logical replication
       4             :  *
       5             :  * Copyright (c) 2016-2020, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/relation.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains helper functions for logical replication relation
      12             :  *    mapping cache.
      13             :  *
      14             :  *-------------------------------------------------------------------------
      15             :  */
      16             : 
      17             : #include "postgres.h"
      18             : 
      19             : #include "access/sysattr.h"
      20             : #include "access/table.h"
      21             : #include "catalog/namespace.h"
      22             : #include "catalog/pg_subscription_rel.h"
      23             : #include "executor/executor.h"
      24             : #include "nodes/makefuncs.h"
      25             : #include "replication/logicalrelation.h"
      26             : #include "replication/worker_internal.h"
      27             : #include "utils/builtins.h"
      28             : #include "utils/inval.h"
      29             : #include "utils/lsyscache.h"
      30             : #include "utils/memutils.h"
      31             : #include "utils/syscache.h"
      32             : 
      33             : static MemoryContext LogicalRepRelMapContext = NULL;
      34             : 
      35             : static HTAB *LogicalRepRelMap = NULL;
      36             : static HTAB *LogicalRepTypMap = NULL;
      37             : 
      38             : /*
      39             :  * Partition map (LogicalRepPartMap)
      40             :  *
      41             :  * When a partitioned table is used as replication target, replicated
      42             :  * operations are actually performed on its leaf partitions, which requires
      43             :  * the partitions to also be mapped to the remote relation.  Parent's entry
      44             :  * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
      45             :  * individual partitions may have different attribute numbers, which means
      46             :  * attribute mappings to remote relation's attributes must be maintained
      47             :  * separately for each partition.
      48             :  */
      49             : static MemoryContext LogicalRepPartMapContext = NULL;
      50             : static HTAB *LogicalRepPartMap = NULL;
      51             : typedef struct LogicalRepPartMapEntry
      52             : {
      53             :     Oid         partoid;        /* LogicalRepPartMap's key */
      54             :     LogicalRepRelMapEntry relmapentry;
      55             : } LogicalRepPartMapEntry;
      56             : 
      57             : /*
      58             :  * Relcache invalidation callback for our relation map cache.
      59             :  */
      60             : static void
      61         890 : logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
      62             : {
      63             :     LogicalRepRelMapEntry *entry;
      64             : 
      65             :     /* Just to be sure. */
      66         890 :     if (LogicalRepRelMap == NULL)
      67           0 :         return;
      68             : 
      69         890 :     if (reloid != InvalidOid)
      70             :     {
      71             :         HASH_SEQ_STATUS status;
      72             : 
      73         890 :         hash_seq_init(&status, LogicalRepRelMap);
      74             : 
      75             :         /* TODO, use inverse lookup hashtable? */
      76        4088 :         while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
      77             :         {
      78        3240 :             if (entry->localreloid == reloid)
      79             :             {
      80          42 :                 entry->localreloid = InvalidOid;
      81          42 :                 hash_seq_term(&status);
      82          42 :                 break;
      83             :             }
      84             :         }
      85             :     }
      86             :     else
      87             :     {
      88             :         /* invalidate all cache entries */
      89             :         HASH_SEQ_STATUS status;
      90             : 
      91           0 :         hash_seq_init(&status, LogicalRepRelMap);
      92             : 
      93           0 :         while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
      94           0 :             entry->localreloid = InvalidOid;
      95             :     }
      96             : }
      97             : 
      98             : /*
      99             :  * Initialize the relation map cache.
     100             :  */
     101             : static void
     102         140 : logicalrep_relmap_init(void)
     103             : {
     104             :     HASHCTL     ctl;
     105             : 
     106         140 :     if (!LogicalRepRelMapContext)
     107         140 :         LogicalRepRelMapContext =
     108         140 :             AllocSetContextCreate(CacheMemoryContext,
     109             :                                   "LogicalRepRelMapContext",
     110             :                                   ALLOCSET_DEFAULT_SIZES);
     111             : 
     112             :     /* Initialize the relation hash table. */
     113        1960 :     MemSet(&ctl, 0, sizeof(ctl));
     114         140 :     ctl.keysize = sizeof(LogicalRepRelId);
     115         140 :     ctl.entrysize = sizeof(LogicalRepRelMapEntry);
     116         140 :     ctl.hcxt = LogicalRepRelMapContext;
     117             : 
     118         140 :     LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
     119             :                                    HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     120             : 
     121             :     /* Initialize the type hash table. */
     122        1960 :     MemSet(&ctl, 0, sizeof(ctl));
     123         140 :     ctl.keysize = sizeof(Oid);
     124         140 :     ctl.entrysize = sizeof(LogicalRepTyp);
     125         140 :     ctl.hcxt = LogicalRepRelMapContext;
     126             : 
     127             :     /* This will usually be small. */
     128         140 :     LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
     129             :                                    HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     130             : 
     131             :     /* Watch for invalidation events. */
     132         140 :     CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
     133             :                                   (Datum) 0);
     134         140 : }
     135             : 
     136             : /*
     137             :  * Free the entry of a relation map cache.
     138             :  */
     139             : static void
     140          56 : logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
     141             : {
     142             :     LogicalRepRelation *remoterel;
     143             : 
     144          56 :     remoterel = &entry->remoterel;
     145             : 
     146          56 :     pfree(remoterel->nspname);
     147          56 :     pfree(remoterel->relname);
     148             : 
     149          56 :     if (remoterel->natts > 0)
     150             :     {
     151             :         int         i;
     152             : 
     153         162 :         for (i = 0; i < remoterel->natts; i++)
     154         106 :             pfree(remoterel->attnames[i]);
     155             : 
     156          56 :         pfree(remoterel->attnames);
     157          56 :         pfree(remoterel->atttyps);
     158             :     }
     159          56 :     bms_free(remoterel->attkeys);
     160             : 
     161          56 :     if (entry->attrmap)
     162          56 :         pfree(entry->attrmap);
     163          56 : }
     164             : 
     165             : /*
     166             :  * Add new entry or update existing entry in the relation map cache.
     167             :  *
     168             :  * Called when new relation mapping is sent by the publisher to update
     169             :  * our expected view of incoming data from said publisher.
     170             :  */
     171             : void
     172         272 : logicalrep_relmap_update(LogicalRepRelation *remoterel)
     173             : {
     174             :     MemoryContext oldctx;
     175             :     LogicalRepRelMapEntry *entry;
     176             :     bool        found;
     177             :     int         i;
     178             : 
     179         272 :     if (LogicalRepRelMap == NULL)
     180         140 :         logicalrep_relmap_init();
     181             : 
     182             :     /*
     183             :      * HASH_ENTER returns the existing entry if present or creates a new one.
     184             :      */
     185         272 :     entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
     186             :                         HASH_ENTER, &found);
     187             : 
     188         272 :     if (found)
     189          56 :         logicalrep_relmap_free_entry(entry);
     190             : 
     191         272 :     memset(entry, 0, sizeof(LogicalRepRelMapEntry));
     192             : 
     193             :     /* Make cached copy of the data */
     194         272 :     oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
     195         272 :     entry->remoterel.remoteid = remoterel->remoteid;
     196         272 :     entry->remoterel.nspname = pstrdup(remoterel->nspname);
     197         272 :     entry->remoterel.relname = pstrdup(remoterel->relname);
     198         272 :     entry->remoterel.natts = remoterel->natts;
     199         272 :     entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
     200         272 :     entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
     201         760 :     for (i = 0; i < remoterel->natts; i++)
     202             :     {
     203         488 :         entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
     204         488 :         entry->remoterel.atttyps[i] = remoterel->atttyps[i];
     205             :     }
     206         272 :     entry->remoterel.replident = remoterel->replident;
     207         272 :     entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
     208         272 :     MemoryContextSwitchTo(oldctx);
     209         272 : }
     210             : 
     211             : /*
     212             :  * Find attribute index in TupleDesc struct by attribute name.
     213             :  *
     214             :  * Returns -1 if not found.
     215             :  */
     216             : static int
     217         564 : logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
     218             : {
     219             :     int         i;
     220             : 
     221         998 :     for (i = 0; i < remoterel->natts; i++)
     222             :     {
     223         894 :         if (strcmp(remoterel->attnames[i], attname) == 0)
     224         460 :             return i;
     225             :     }
     226             : 
     227         104 :     return -1;
     228             : }
     229             : 
     230             : /*
     231             :  * Open the local relation associated with the remote one.
     232             :  *
     233             :  * Optionally rebuilds the Relcache mapping if it was invalidated
     234             :  * by local DDL.
     235             :  */
     236             : LogicalRepRelMapEntry *
     237        1716 : logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
     238             : {
     239             :     LogicalRepRelMapEntry *entry;
     240             :     bool        found;
     241        1716 :     Oid         relid = InvalidOid;
     242             :     LogicalRepRelation *remoterel;
     243             : 
     244        1716 :     if (LogicalRepRelMap == NULL)
     245           0 :         logicalrep_relmap_init();
     246             : 
     247             :     /* Search for existing entry. */
     248        1716 :     entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
     249             :                         HASH_FIND, &found);
     250             : 
     251        1716 :     if (!found)
     252           0 :         elog(ERROR, "no relation map entry for remote relation ID %u",
     253             :              remoteid);
     254             : 
     255        1716 :     remoterel = &entry->remoterel;
     256             : 
     257             :     /*
     258             :      * When opening and locking a relation, pending invalidation messages are
     259             :      * processed which can invalidate the relation.  We need to update the
     260             :      * local cache both when we are first time accessing the relation and when
     261             :      * the relation is invalidated (aka entry->localreloid is set InvalidOid).
     262             :      */
     263        1716 :     if (!OidIsValid(entry->localreloid))
     264             :     {
     265             :         /* Try to find and lock the relation by name. */
     266         260 :         relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
     267             :                                               remoterel->relname, -1),
     268             :                                  lockmode, true);
     269         260 :         if (!OidIsValid(relid))
     270           2 :             ereport(ERROR,
     271             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     272             :                      errmsg("logical replication target relation \"%s.%s\" does not exist",
     273             :                             remoterel->nspname, remoterel->relname)));
     274         258 :         entry->localrel = table_open(relid, NoLock);
     275             : 
     276             :     }
     277             :     else
     278             :     {
     279        1456 :         relid = entry->localreloid;
     280        1456 :         entry->localrel = table_open(entry->localreloid, lockmode);
     281             :     }
     282             : 
     283        1714 :     if (!OidIsValid(entry->localreloid))
     284             :     {
     285             :         int         found;
     286             :         Bitmapset  *idkey;
     287             :         TupleDesc   desc;
     288             :         MemoryContext oldctx;
     289             :         int         i;
     290             : 
     291             :         /* Check for supported relkind. */
     292         258 :         CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
     293         258 :                                  remoterel->nspname, remoterel->relname);
     294             : 
     295             :         /*
     296             :          * Build the mapping of local attribute numbers to remote attribute
     297             :          * numbers and validate that we don't miss any replicated columns as
     298             :          * that would result in potentially unwanted data loss.
     299             :          */
     300         258 :         desc = RelationGetDescr(entry->localrel);
     301         258 :         oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
     302         258 :         entry->attrmap = make_attrmap(desc->natts);
     303         258 :         MemoryContextSwitchTo(oldctx);
     304             : 
     305         258 :         found = 0;
     306         828 :         for (i = 0; i < desc->natts; i++)
     307             :         {
     308             :             int         attnum;
     309         570 :             Form_pg_attribute attr = TupleDescAttr(desc, i);
     310             : 
     311         570 :             if (attr->attisdropped || attr->attgenerated)
     312             :             {
     313           6 :                 entry->attrmap->attnums[i] = -1;
     314           6 :                 continue;
     315             :             }
     316             : 
     317         564 :             attnum = logicalrep_rel_att_by_name(remoterel,
     318         564 :                                                 NameStr(attr->attname));
     319             : 
     320         564 :             entry->attrmap->attnums[i] = attnum;
     321         564 :             if (attnum >= 0)
     322         460 :                 found++;
     323             :         }
     324             : 
     325             :         /* TODO, detail message with names of missing columns */
     326         258 :         if (found < remoterel->natts)
     327           0 :             ereport(ERROR,
     328             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     329             :                      errmsg("logical replication target relation \"%s.%s\" is missing "
     330             :                             "some replicated columns",
     331             :                             remoterel->nspname, remoterel->relname)));
     332             : 
     333             :         /*
     334             :          * Check that replica identity matches. We allow for stricter replica
     335             :          * identity (fewer columns) on subscriber as that will not stop us
     336             :          * from finding unique tuple. IE, if publisher has identity
     337             :          * (id,timestamp) and subscriber just (id) this will not be a problem,
     338             :          * but in the opposite scenario it will.
     339             :          *
     340             :          * Don't throw any error here just mark the relation entry as not
     341             :          * updatable, as replica identity is only for updates and deletes but
     342             :          * inserts can be replicated even without it.
     343             :          */
     344         258 :         entry->updatable = true;
     345         258 :         idkey = RelationGetIndexAttrBitmap(entry->localrel,
     346             :                                            INDEX_ATTR_BITMAP_IDENTITY_KEY);
     347             :         /* fallback to PK if no replica identity */
     348         258 :         if (idkey == NULL)
     349             :         {
     350          52 :             idkey = RelationGetIndexAttrBitmap(entry->localrel,
     351             :                                                INDEX_ATTR_BITMAP_PRIMARY_KEY);
     352             : 
     353             :             /*
     354             :              * If no replica identity index and no PK, the published table
     355             :              * must have replica identity FULL.
     356             :              */
     357          52 :             if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
     358          38 :                 entry->updatable = false;
     359             :         }
     360             : 
     361         258 :         i = -1;
     362         464 :         while ((i = bms_next_member(idkey, i)) >= 0)
     363             :         {
     364         210 :             int         attnum = i + FirstLowInvalidHeapAttributeNumber;
     365             : 
     366         210 :             if (!AttrNumberIsForUserDefinedAttr(attnum))
     367           0 :                 ereport(ERROR,
     368             :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     369             :                          errmsg("logical replication target relation \"%s.%s\" uses "
     370             :                                 "system columns in REPLICA IDENTITY index",
     371             :                                 remoterel->nspname, remoterel->relname)));
     372             : 
     373         210 :             attnum = AttrNumberGetAttrOffset(attnum);
     374             : 
     375         210 :             if (entry->attrmap->attnums[attnum] < 0 ||
     376         208 :                 !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
     377             :             {
     378           4 :                 entry->updatable = false;
     379           4 :                 break;
     380             :             }
     381             :         }
     382             : 
     383         258 :         entry->localreloid = relid;
     384             :     }
     385             : 
     386        1714 :     if (entry->state != SUBREL_STATE_READY)
     387         274 :         entry->state = GetSubscriptionRelState(MySubscription->oid,
     388             :                                                entry->localreloid,
     389             :                                                &entry->statelsn,
     390             :                                                true);
     391             : 
     392        1714 :     return entry;
     393             : }
     394             : 
     395             : /*
     396             :  * Close the previously opened logical relation.
     397             :  */
     398             : void
     399        1710 : logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
     400             : {
     401        1710 :     table_close(rel->localrel, lockmode);
     402        1710 :     rel->localrel = NULL;
     403        1710 : }
     404             : 
     405             : /*
     406             :  * Free the type map cache entry data.
     407             :  */
     408             : static void
     409           8 : logicalrep_typmap_free_entry(LogicalRepTyp *entry)
     410             : {
     411           8 :     pfree(entry->nspname);
     412           8 :     pfree(entry->typname);
     413           8 : }
     414             : 
     415             : /*
     416             :  * Add new entry or update existing entry in the type map cache.
     417             :  */
     418             : void
     419          32 : logicalrep_typmap_update(LogicalRepTyp *remotetyp)
     420             : {
     421             :     MemoryContext oldctx;
     422             :     LogicalRepTyp *entry;
     423             :     bool        found;
     424             : 
     425          32 :     if (LogicalRepTypMap == NULL)
     426           0 :         logicalrep_relmap_init();
     427             : 
     428             :     /*
     429             :      * HASH_ENTER returns the existing entry if present or creates a new one.
     430             :      */
     431          32 :     entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
     432             :                         HASH_ENTER, &found);
     433             : 
     434          32 :     if (found)
     435           8 :         logicalrep_typmap_free_entry(entry);
     436             : 
     437             :     /* Make cached copy of the data */
     438          32 :     entry->remoteid = remotetyp->remoteid;
     439          32 :     oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
     440          32 :     entry->nspname = pstrdup(remotetyp->nspname);
     441          32 :     entry->typname = pstrdup(remotetyp->typname);
     442          32 :     MemoryContextSwitchTo(oldctx);
     443          32 : }
     444             : 
     445             : /*
     446             :  * Fetch type name from the cache by remote type OID.
     447             :  *
     448             :  * Return a substitute value if we cannot find the data type; no message is
     449             :  * sent to the log in that case, because this is used by error callback
     450             :  * already.
     451             :  */
     452             : char *
     453           0 : logicalrep_typmap_gettypname(Oid remoteid)
     454             : {
     455             :     LogicalRepTyp *entry;
     456             :     bool        found;
     457             : 
     458             :     /* Internal types are mapped directly. */
     459           0 :     if (remoteid < FirstGenbkiObjectId)
     460             :     {
     461           0 :         if (!get_typisdefined(remoteid))
     462             :         {
     463             :             /*
     464             :              * This can be caused by having a publisher with a higher
     465             :              * PostgreSQL major version than the subscriber.
     466             :              */
     467           0 :             return psprintf("unrecognized %u", remoteid);
     468             :         }
     469             : 
     470           0 :         return format_type_be(remoteid);
     471             :     }
     472             : 
     473           0 :     if (LogicalRepTypMap == NULL)
     474             :     {
     475             :         /*
     476             :          * If the typemap is not initialized yet, we cannot possibly attempt
     477             :          * to search the hash table; but there's no way we know the type
     478             :          * locally yet, since we haven't received a message about this type,
     479             :          * so this is the best we can do.
     480             :          */
     481           0 :         return psprintf("unrecognized %u", remoteid);
     482             :     }
     483             : 
     484             :     /* search the mapping */
     485           0 :     entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
     486             :                         HASH_FIND, &found);
     487           0 :     if (!found)
     488           0 :         return psprintf("unrecognized %u", remoteid);
     489             : 
     490             :     Assert(OidIsValid(entry->remoteid));
     491           0 :     return psprintf("%s.%s", entry->nspname, entry->typname);
     492             : }
     493             : 
     494             : /*
     495             :  * Partition cache: look up partition LogicalRepRelMapEntry's
     496             :  *
     497             :  * Unlike relation map cache, this is keyed by partition OID, not remote
     498             :  * relation OID, because we only have to use this cache in the case where
     499             :  * partitions are not directly mapped to any remote relation, such as when
     500             :  * replication is occurring with one of their ancestors as target.
     501             :  */
     502             : 
     503             : /*
     504             :  * Relcache invalidation callback
     505             :  */
     506             : static void
     507         416 : logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
     508             : {
     509             :     LogicalRepRelMapEntry *entry;
     510             : 
     511             :     /* Just to be sure. */
     512         416 :     if (LogicalRepPartMap == NULL)
     513           0 :         return;
     514             : 
     515         416 :     if (reloid != InvalidOid)
     516             :     {
     517             :         HASH_SEQ_STATUS status;
     518             : 
     519         416 :         hash_seq_init(&status, LogicalRepPartMap);
     520             : 
     521             :         /* TODO, use inverse lookup hashtable? */
     522        1024 :         while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
     523             :         {
     524         608 :             if (entry->localreloid == reloid)
     525             :             {
     526           0 :                 entry->localreloid = InvalidOid;
     527           0 :                 hash_seq_term(&status);
     528           0 :                 break;
     529             :             }
     530             :         }
     531             :     }
     532             :     else
     533             :     {
     534             :         /* invalidate all cache entries */
     535             :         HASH_SEQ_STATUS status;
     536             : 
     537           0 :         hash_seq_init(&status, LogicalRepPartMap);
     538             : 
     539           0 :         while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
     540           0 :             entry->localreloid = InvalidOid;
     541             :     }
     542             : }
     543             : 
     544             : /*
     545             :  * Initialize the partition map cache.
     546             :  */
     547             : static void
     548           6 : logicalrep_partmap_init(void)
     549             : {
     550             :     HASHCTL     ctl;
     551             : 
     552           6 :     if (!LogicalRepPartMapContext)
     553           6 :         LogicalRepPartMapContext =
     554           6 :             AllocSetContextCreate(CacheMemoryContext,
     555             :                                   "LogicalRepPartMapContext",
     556             :                                   ALLOCSET_DEFAULT_SIZES);
     557             : 
     558             :     /* Initialize the relation hash table. */
     559          84 :     MemSet(&ctl, 0, sizeof(ctl));
     560           6 :     ctl.keysize = sizeof(Oid);  /* partition OID */
     561           6 :     ctl.entrysize = sizeof(LogicalRepPartMapEntry);
     562           6 :     ctl.hcxt = LogicalRepPartMapContext;
     563             : 
     564           6 :     LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl,
     565             :                                     HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     566             : 
     567             :     /* Watch for invalidation events. */
     568           6 :     CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb,
     569             :                                   (Datum) 0);
     570           6 : }
     571             : 
     572             : /*
     573             :  * logicalrep_partition_open
     574             :  *
     575             :  * Returned entry reuses most of the values of the root table's entry, save
     576             :  * the attribute map, which can be different for the partition.
     577             :  *
     578             :  * Note there's no logicalrep_partition_close, because the caller closes the
     579             :  * the component relation.
     580             :  */
     581             : LogicalRepRelMapEntry *
     582          10 : logicalrep_partition_open(LogicalRepRelMapEntry *root,
     583             :                           Relation partrel, AttrMap *map)
     584             : {
     585             :     LogicalRepRelMapEntry *entry;
     586             :     LogicalRepPartMapEntry *part_entry;
     587          10 :     LogicalRepRelation *remoterel = &root->remoterel;
     588          10 :     Oid         partOid = RelationGetRelid(partrel);
     589          10 :     AttrMap    *attrmap = root->attrmap;
     590             :     bool        found;
     591             :     int         i;
     592             :     MemoryContext oldctx;
     593             : 
     594          10 :     if (LogicalRepPartMap == NULL)
     595           6 :         logicalrep_partmap_init();
     596             : 
     597             :     /* Search for existing entry. */
     598          10 :     part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap,
     599             :                                                         (void *) &partOid,
     600             :                                                         HASH_ENTER, &found);
     601             : 
     602          10 :     if (found)
     603           2 :         return &part_entry->relmapentry;
     604             : 
     605           8 :     memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
     606             : 
     607             :     /* Switch to longer-lived context. */
     608           8 :     oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
     609             : 
     610           8 :     part_entry->partoid = partOid;
     611             : 
     612             :     /* Remote relation is used as-is from the root entry. */
     613           8 :     entry = &part_entry->relmapentry;
     614           8 :     entry->remoterel.remoteid = remoterel->remoteid;
     615           8 :     entry->remoterel.nspname = pstrdup(remoterel->nspname);
     616           8 :     entry->remoterel.relname = pstrdup(remoterel->relname);
     617           8 :     entry->remoterel.natts = remoterel->natts;
     618           8 :     entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
     619           8 :     entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
     620          24 :     for (i = 0; i < remoterel->natts; i++)
     621             :     {
     622          16 :         entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
     623          16 :         entry->remoterel.atttyps[i] = remoterel->atttyps[i];
     624             :     }
     625           8 :     entry->remoterel.replident = remoterel->replident;
     626           8 :     entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
     627             : 
     628           8 :     entry->localrel = partrel;
     629           8 :     entry->localreloid = partOid;
     630             : 
     631             :     /*
     632             :      * If the partition's attributes don't match the root relation's, we'll
     633             :      * need to make a new attrmap which maps partition attribute numbers to
     634             :      * remoterel's, instead the original which maps root relation's attribute
     635             :      * numbers to remoterel's.
     636             :      *
     637             :      * Note that 'map' which comes from the tuple routing data structure
     638             :      * contains 1-based attribute numbers (of the parent relation).  However,
     639             :      * the map in 'entry', a logical replication data structure, contains
     640             :      * 0-based attribute numbers (of the remote relation).
     641             :      */
     642           8 :     if (map)
     643             :     {
     644             :         AttrNumber  attno;
     645             : 
     646           4 :         entry->attrmap = make_attrmap(map->maplen);
     647          16 :         for (attno = 0; attno < entry->attrmap->maplen; attno++)
     648             :         {
     649          12 :             AttrNumber  root_attno = map->attnums[attno];
     650             : 
     651          12 :             entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
     652             :         }
     653             :     }
     654             :     else
     655           4 :         entry->attrmap = attrmap;
     656             : 
     657           8 :     entry->updatable = root->updatable;
     658             : 
     659             :     /* state and statelsn are left set to 0. */
     660           8 :     MemoryContextSwitchTo(oldctx);
     661             : 
     662           8 :     return entry;
     663             : }

Generated by: LCOV version 1.13