LCOV - code coverage report
Current view: top level - src/backend/replication/logical - relation.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 131 151 86.8 %
Date: 2019-11-15 23:07:02 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * relation.c
       3             :  *     PostgreSQL logical replication
       4             :  *
       5             :  * Copyright (c) 2016-2019, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/relation.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains helper functions for logical replication relation
      12             :  *    mapping cache.
      13             :  *
      14             :  *-------------------------------------------------------------------------
      15             :  */
      16             : 
      17             : #include "postgres.h"
      18             : 
      19             : #include "access/sysattr.h"
      20             : #include "access/table.h"
      21             : #include "catalog/namespace.h"
      22             : #include "catalog/pg_subscription_rel.h"
      23             : #include "executor/executor.h"
      24             : #include "nodes/makefuncs.h"
      25             : #include "replication/logicalrelation.h"
      26             : #include "replication/worker_internal.h"
      27             : #include "utils/builtins.h"
      28             : #include "utils/inval.h"
      29             : #include "utils/lsyscache.h"
      30             : #include "utils/memutils.h"
      31             : #include "utils/syscache.h"
      32             : 
      33             : static MemoryContext LogicalRepRelMapContext = NULL;
      34             : 
      35             : static HTAB *LogicalRepRelMap = NULL;
      36             : static HTAB *LogicalRepTypMap = NULL;
      37             : 
      38             : 
      39             : /*
      40             :  * Relcache invalidation callback for our relation map cache.
      41             :  */
      42             : static void
      43         184 : logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
      44             : {
      45             :     LogicalRepRelMapEntry *entry;
      46             : 
      47             :     /* Just to be sure. */
      48         184 :     if (LogicalRepRelMap == NULL)
      49           0 :         return;
      50             : 
      51         184 :     if (reloid != InvalidOid)
      52             :     {
      53             :         HASH_SEQ_STATUS status;
      54             : 
      55         184 :         hash_seq_init(&status, LogicalRepRelMap);
      56             : 
      57             :         /* TODO, use inverse lookup hashtable? */
      58         184 :         while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
      59             :         {
      60         270 :             if (entry->localreloid == reloid)
      61             :             {
      62          20 :                 entry->localreloid = InvalidOid;
      63          20 :                 hash_seq_term(&status);
      64          20 :                 break;
      65             :             }
      66             :         }
      67             :     }
      68             :     else
      69             :     {
      70             :         /* invalidate all cache entries */
      71             :         HASH_SEQ_STATUS status;
      72             : 
      73           0 :         hash_seq_init(&status, LogicalRepRelMap);
      74             : 
      75           0 :         while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
      76           0 :             entry->localreloid = InvalidOid;
      77             :     }
      78             : }
      79             : 
      80             : /*
      81             :  * Initialize the relation map cache.
      82             :  */
      83             : static void
      84         108 : logicalrep_relmap_init(void)
      85             : {
      86             :     HASHCTL     ctl;
      87             : 
      88         108 :     if (!LogicalRepRelMapContext)
      89         108 :         LogicalRepRelMapContext =
      90         108 :             AllocSetContextCreate(CacheMemoryContext,
      91             :                                   "LogicalRepRelMapContext",
      92             :                                   ALLOCSET_DEFAULT_SIZES);
      93             : 
      94             :     /* Initialize the relation hash table. */
      95         108 :     MemSet(&ctl, 0, sizeof(ctl));
      96         108 :     ctl.keysize = sizeof(LogicalRepRelId);
      97         108 :     ctl.entrysize = sizeof(LogicalRepRelMapEntry);
      98         108 :     ctl.hcxt = LogicalRepRelMapContext;
      99             : 
     100         108 :     LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
     101             :                                    HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     102             : 
     103             :     /* Initialize the type hash table. */
     104         108 :     MemSet(&ctl, 0, sizeof(ctl));
     105         108 :     ctl.keysize = sizeof(Oid);
     106         108 :     ctl.entrysize = sizeof(LogicalRepTyp);
     107         108 :     ctl.hcxt = LogicalRepRelMapContext;
     108             : 
     109             :     /* This will usually be small. */
     110         108 :     LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
     111             :                                    HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     112             : 
     113             :     /* Watch for invalidation events. */
     114         108 :     CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
     115             :                                   (Datum) 0);
     116         108 : }
     117             : 
     118             : /*
     119             :  * Free the entry of a relation map cache.
     120             :  */
     121             : static void
     122          12 : logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
     123             : {
     124             :     LogicalRepRelation *remoterel;
     125             : 
     126          12 :     remoterel = &entry->remoterel;
     127             : 
     128          12 :     pfree(remoterel->nspname);
     129          12 :     pfree(remoterel->relname);
     130             : 
     131          12 :     if (remoterel->natts > 0)
     132             :     {
     133             :         int         i;
     134             : 
     135          28 :         for (i = 0; i < remoterel->natts; i++)
     136          16 :             pfree(remoterel->attnames[i]);
     137             : 
     138          12 :         pfree(remoterel->attnames);
     139          12 :         pfree(remoterel->atttyps);
     140             :     }
     141          12 :     bms_free(remoterel->attkeys);
     142             : 
     143          12 :     if (entry->attrmap)
     144          12 :         pfree(entry->attrmap);
     145          12 : }
     146             : 
     147             : /*
     148             :  * Add new entry or update existing entry in the relation map cache.
     149             :  *
     150             :  * Called when new relation mapping is sent by the publisher to update
     151             :  * our expected view of incoming data from said publisher.
     152             :  */
     153             : void
     154         166 : logicalrep_relmap_update(LogicalRepRelation *remoterel)
     155             : {
     156             :     MemoryContext oldctx;
     157             :     LogicalRepRelMapEntry *entry;
     158             :     bool        found;
     159             :     int         i;
     160             : 
     161         166 :     if (LogicalRepRelMap == NULL)
     162         108 :         logicalrep_relmap_init();
     163             : 
     164             :     /*
     165             :      * HASH_ENTER returns the existing entry if present or creates a new one.
     166             :      */
     167         166 :     entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
     168             :                         HASH_ENTER, &found);
     169             : 
     170         166 :     if (found)
     171          12 :         logicalrep_relmap_free_entry(entry);
     172             : 
     173         166 :     memset(entry, 0, sizeof(LogicalRepRelMapEntry));
     174             : 
     175             :     /* Make cached copy of the data */
     176         166 :     oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
     177         166 :     entry->remoterel.remoteid = remoterel->remoteid;
     178         166 :     entry->remoterel.nspname = pstrdup(remoterel->nspname);
     179         166 :     entry->remoterel.relname = pstrdup(remoterel->relname);
     180         166 :     entry->remoterel.natts = remoterel->natts;
     181         166 :     entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
     182         166 :     entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
     183         442 :     for (i = 0; i < remoterel->natts; i++)
     184             :     {
     185         276 :         entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
     186         276 :         entry->remoterel.atttyps[i] = remoterel->atttyps[i];
     187             :     }
     188         166 :     entry->remoterel.replident = remoterel->replident;
     189         166 :     entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
     190         166 :     MemoryContextSwitchTo(oldctx);
     191         166 : }
     192             : 
     193             : /*
     194             :  * Find attribute index in TupleDesc struct by attribute name.
     195             :  *
     196             :  * Returns -1 if not found.
     197             :  */
     198             : static int
     199         294 : logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
     200             : {
     201             :     int         i;
     202             : 
     203         456 :     for (i = 0; i < remoterel->natts; i++)
     204             :     {
     205         438 :         if (strcmp(remoterel->attnames[i], attname) == 0)
     206         276 :             return i;
     207             :     }
     208             : 
     209          18 :     return -1;
     210             : }
     211             : 
     212             : /*
     213             :  * Open the local relation associated with the remote one.
     214             :  *
     215             :  * Optionally rebuilds the Relcache mapping if it was invalidated
     216             :  * by local DDL.
     217             :  */
     218             : LogicalRepRelMapEntry *
     219        1414 : logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
     220             : {
     221             :     LogicalRepRelMapEntry *entry;
     222             :     bool        found;
     223             : 
     224        1414 :     if (LogicalRepRelMap == NULL)
     225           0 :         logicalrep_relmap_init();
     226             : 
     227             :     /* Search for existing entry. */
     228        1414 :     entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
     229             :                         HASH_FIND, &found);
     230             : 
     231        1414 :     if (!found)
     232           0 :         elog(ERROR, "no relation map entry for remote relation ID %u",
     233             :              remoteid);
     234             : 
     235             :     /* Need to update the local cache? */
     236        1414 :     if (!OidIsValid(entry->localreloid))
     237             :     {
     238             :         Oid         relid;
     239             :         int         i;
     240             :         int         found;
     241             :         Bitmapset  *idkey;
     242             :         TupleDesc   desc;
     243             :         LogicalRepRelation *remoterel;
     244             :         MemoryContext oldctx;
     245             : 
     246         168 :         remoterel = &entry->remoterel;
     247             : 
     248             :         /* Try to find and lock the relation by name. */
     249         168 :         relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
     250             :                                               remoterel->relname, -1),
     251             :                                  lockmode, true);
     252         168 :         if (!OidIsValid(relid))
     253           2 :             ereport(ERROR,
     254             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     255             :                      errmsg("logical replication target relation \"%s.%s\" does not exist",
     256             :                             remoterel->nspname, remoterel->relname)));
     257         166 :         entry->localrel = table_open(relid, NoLock);
     258             : 
     259             :         /* Check for supported relkind. */
     260         166 :         CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
     261         166 :                                  remoterel->nspname, remoterel->relname);
     262             : 
     263             :         /*
     264             :          * Build the mapping of local attribute numbers to remote attribute
     265             :          * numbers and validate that we don't miss any replicated columns as
     266             :          * that would result in potentially unwanted data loss.
     267             :          */
     268         166 :         desc = RelationGetDescr(entry->localrel);
     269         166 :         oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
     270         166 :         entry->attrmap = palloc(desc->natts * sizeof(AttrNumber));
     271         166 :         MemoryContextSwitchTo(oldctx);
     272             : 
     273         166 :         found = 0;
     274         464 :         for (i = 0; i < desc->natts; i++)
     275             :         {
     276             :             int         attnum;
     277         298 :             Form_pg_attribute attr = TupleDescAttr(desc, i);
     278             : 
     279         298 :             if (attr->attisdropped || attr->attgenerated)
     280             :             {
     281           4 :                 entry->attrmap[i] = -1;
     282           4 :                 continue;
     283             :             }
     284             : 
     285         294 :             attnum = logicalrep_rel_att_by_name(remoterel,
     286         294 :                                                 NameStr(attr->attname));
     287             : 
     288         294 :             entry->attrmap[i] = attnum;
     289         294 :             if (attnum >= 0)
     290         276 :                 found++;
     291             :         }
     292             : 
     293             :         /* TODO, detail message with names of missing columns */
     294         166 :         if (found < remoterel->natts)
     295           0 :             ereport(ERROR,
     296             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     297             :                      errmsg("logical replication target relation \"%s.%s\" is missing "
     298             :                             "some replicated columns",
     299             :                             remoterel->nspname, remoterel->relname)));
     300             : 
     301             :         /*
     302             :          * Check that replica identity matches. We allow for stricter replica
     303             :          * identity (fewer columns) on subscriber as that will not stop us
     304             :          * from finding unique tuple. IE, if publisher has identity
     305             :          * (id,timestamp) and subscriber just (id) this will not be a problem,
     306             :          * but in the opposite scenario it will.
     307             :          *
     308             :          * Don't throw any error here just mark the relation entry as not
     309             :          * updatable, as replica identity is only for updates and deletes but
     310             :          * inserts can be replicated even without it.
     311             :          */
     312         166 :         entry->updatable = true;
     313         166 :         idkey = RelationGetIndexAttrBitmap(entry->localrel,
     314             :                                            INDEX_ATTR_BITMAP_IDENTITY_KEY);
     315             :         /* fallback to PK if no replica identity */
     316         166 :         if (idkey == NULL)
     317             :         {
     318          44 :             idkey = RelationGetIndexAttrBitmap(entry->localrel,
     319             :                                                INDEX_ATTR_BITMAP_PRIMARY_KEY);
     320             : 
     321             :             /*
     322             :              * If no replica identity index and no PK, the published table
     323             :              * must have replica identity FULL.
     324             :              */
     325          44 :             if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
     326          34 :                 entry->updatable = false;
     327             :         }
     328             : 
     329         166 :         i = -1;
     330         452 :         while ((i = bms_next_member(idkey, i)) >= 0)
     331             :         {
     332         122 :             int         attnum = i + FirstLowInvalidHeapAttributeNumber;
     333             : 
     334         122 :             if (!AttrNumberIsForUserDefinedAttr(attnum))
     335           0 :                 ereport(ERROR,
     336             :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     337             :                          errmsg("logical replication target relation \"%s.%s\" uses "
     338             :                                 "system columns in REPLICA IDENTITY index",
     339             :                                 remoterel->nspname, remoterel->relname)));
     340             : 
     341         122 :             attnum = AttrNumberGetAttrOffset(attnum);
     342             : 
     343         242 :             if (entry->attrmap[attnum] < 0 ||
     344         120 :                 !bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
     345             :             {
     346           2 :                 entry->updatable = false;
     347           2 :                 break;
     348             :             }
     349             :         }
     350             : 
     351         166 :         entry->localreloid = relid;
     352             :     }
     353             :     else
     354        1246 :         entry->localrel = table_open(entry->localreloid, lockmode);
     355             : 
     356        1412 :     if (entry->state != SUBREL_STATE_READY)
     357         184 :         entry->state = GetSubscriptionRelState(MySubscription->oid,
     358             :                                                entry->localreloid,
     359             :                                                &entry->statelsn,
     360             :                                                true);
     361             : 
     362        1412 :     return entry;
     363             : }
     364             : 
     365             : /*
     366             :  * Close the previously opened logical relation.
     367             :  */
     368             : void
     369        1408 : logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
     370             : {
     371        1408 :     table_close(rel->localrel, lockmode);
     372        1408 :     rel->localrel = NULL;
     373        1408 : }
     374             : 
     375             : /*
     376             :  * Free the type map cache entry data.
     377             :  */
     378             : static void
     379           8 : logicalrep_typmap_free_entry(LogicalRepTyp *entry)
     380             : {
     381           8 :     pfree(entry->nspname);
     382           8 :     pfree(entry->typname);
     383           8 : }
     384             : 
     385             : /*
     386             :  * Add new entry or update existing entry in the type map cache.
     387             :  */
     388             : void
     389          32 : logicalrep_typmap_update(LogicalRepTyp *remotetyp)
     390             : {
     391             :     MemoryContext oldctx;
     392             :     LogicalRepTyp *entry;
     393             :     bool        found;
     394             : 
     395          32 :     if (LogicalRepTypMap == NULL)
     396           0 :         logicalrep_relmap_init();
     397             : 
     398             :     /*
     399             :      * HASH_ENTER returns the existing entry if present or creates a new one.
     400             :      */
     401          32 :     entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
     402             :                         HASH_ENTER, &found);
     403             : 
     404          32 :     if (found)
     405           8 :         logicalrep_typmap_free_entry(entry);
     406             : 
     407             :     /* Make cached copy of the data */
     408          32 :     entry->remoteid = remotetyp->remoteid;
     409          32 :     oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
     410          32 :     entry->nspname = pstrdup(remotetyp->nspname);
     411          32 :     entry->typname = pstrdup(remotetyp->typname);
     412          32 :     MemoryContextSwitchTo(oldctx);
     413          32 : }
     414             : 
     415             : /*
     416             :  * Fetch type name from the cache by remote type OID.
     417             :  *
     418             :  * Return a substitute value if we cannot find the data type; no message is
     419             :  * sent to the log in that case, because this is used by error callback
     420             :  * already.
     421             :  */
     422             : char *
     423           0 : logicalrep_typmap_gettypname(Oid remoteid)
     424             : {
     425             :     LogicalRepTyp *entry;
     426             :     bool        found;
     427             : 
     428             :     /* Internal types are mapped directly. */
     429           0 :     if (remoteid < FirstGenbkiObjectId)
     430             :     {
     431           0 :         if (!get_typisdefined(remoteid))
     432             :         {
     433             :             /*
     434             :              * This can be caused by having a publisher with a higher
     435             :              * PostgreSQL major version than the subscriber.
     436             :              */
     437           0 :             return psprintf("unrecognized %u", remoteid);
     438             :         }
     439             : 
     440           0 :         return format_type_be(remoteid);
     441             :     }
     442             : 
     443           0 :     if (LogicalRepTypMap == NULL)
     444             :     {
     445             :         /*
     446             :          * If the typemap is not initialized yet, we cannot possibly attempt
     447             :          * to search the hash table; but there's no way we know the type
     448             :          * locally yet, since we haven't received a message about this type,
     449             :          * so this is the best we can do.
     450             :          */
     451           0 :         return psprintf("unrecognized %u", remoteid);
     452             :     }
     453             : 
     454             :     /* search the mapping */
     455           0 :     entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
     456             :                         HASH_FIND, &found);
     457           0 :     if (!found)
     458           0 :         return psprintf("unrecognized %u", remoteid);
     459             : 
     460             :     Assert(OidIsValid(entry->remoteid));
     461           0 :     return psprintf("%s.%s", entry->nspname, entry->typname);
     462             : }

Generated by: LCOV version 1.13