LCOV - code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 235 277 84.8 %
Date: 2020-06-03 11:07:14 Functions: 18 18 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pgoutput.c
       4             :  *      Logical Replication output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/backend/replication/pgoutput/pgoutput.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/tupconvert.h"
      16             : #include "catalog/partition.h"
      17             : #include "catalog/pg_publication.h"
      18             : #include "fmgr.h"
      19             : #include "replication/logical.h"
      20             : #include "replication/logicalproto.h"
      21             : #include "replication/origin.h"
      22             : #include "replication/pgoutput.h"
      23             : #include "utils/int8.h"
      24             : #include "utils/inval.h"
      25             : #include "utils/lsyscache.h"
      26             : #include "utils/memutils.h"
      27             : #include "utils/syscache.h"
      28             : #include "utils/varlena.h"
      29             : 
      30         184 : PG_MODULE_MAGIC;
      31             : 
      32             : extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
      33             : 
      34             : static void pgoutput_startup(LogicalDecodingContext *ctx,
      35             :                              OutputPluginOptions *opt, bool is_init);
      36             : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      37             : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      38             :                                ReorderBufferTXN *txn);
      39             : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      40             :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      41             : static void pgoutput_change(LogicalDecodingContext *ctx,
      42             :                             ReorderBufferTXN *txn, Relation rel,
      43             :                             ReorderBufferChange *change);
      44             : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      45             :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
      46             :                               ReorderBufferChange *change);
      47             : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      48             :                                    RepOriginId origin_id);
      49             : 
      50             : static bool publications_valid;
      51             : 
      52             : static List *LoadPublications(List *pubnames);
      53             : static void publication_invalidation_cb(Datum arg, int cacheid,
      54             :                                         uint32 hashvalue);
      55             : static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
      56             : 
      57             : /*
      58             :  * Entry in the map used to remember which relation schemas we sent.
      59             :  *
      60             :  * For partitions, 'pubactions' considers not only the table's own
      61             :  * publications, but also those of all of its ancestors.
      62             :  */
      63             : typedef struct RelationSyncEntry
      64             : {
      65             :     Oid         relid;          /* relation oid */
      66             : 
      67             :     /*
      68             :      * Did we send the schema?  If ancestor relid is set, its schema must also
      69             :      * have been sent for this to be true.
      70             :      */
      71             :     bool        schema_sent;
      72             : 
      73             :     bool        replicate_valid;
      74             :     PublicationActions pubactions;
      75             : 
      76             :     /*
      77             :      * OID of the relation to publish changes as.  For a partition, this may
      78             :      * be set to one of its ancestors whose schema will be used when
      79             :      * replicating changes, if publish_via_partition_root is set for the
      80             :      * publication.
      81             :      */
      82             :     Oid         publish_as_relid;
      83             : 
      84             :     /*
      85             :      * Map used when replicating using an ancestor's schema to convert tuples
      86             :      * from partition's type to the ancestor's; NULL if publish_as_relid is
      87             :      * same as 'relid' or if unnecessary due to partition and the ancestor
      88             :      * having identical TupleDesc.
      89             :      */
      90             :     TupleConversionMap *map;
      91             : } RelationSyncEntry;
      92             : 
      93             : /* Map used to remember which relation schemas we sent. */
      94             : static HTAB *RelationSyncCache = NULL;
      95             : 
      96             : static void init_rel_sync_cache(MemoryContext decoding_context);
      97             : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
      98             : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
      99             : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
     100             :                                           uint32 hashvalue);
     101             : 
     102             : /*
     103             :  * Specify output plugin callbacks
     104             :  */
     105             : void
     106         190 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     107             : {
     108             :     AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
     109             : 
     110         190 :     cb->startup_cb = pgoutput_startup;
     111         190 :     cb->begin_cb = pgoutput_begin_txn;
     112         190 :     cb->change_cb = pgoutput_change;
     113         190 :     cb->truncate_cb = pgoutput_truncate;
     114         190 :     cb->commit_cb = pgoutput_commit_txn;
     115         190 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
     116         190 :     cb->shutdown_cb = pgoutput_shutdown;
     117         190 : }
     118             : 
     119             : static void
     120          40 : parse_output_parameters(List *options, uint32 *protocol_version,
     121             :                         List **publication_names)
     122             : {
     123             :     ListCell   *lc;
     124          40 :     bool        protocol_version_given = false;
     125          40 :     bool        publication_names_given = false;
     126             : 
     127         120 :     foreach(lc, options)
     128             :     {
     129          80 :         DefElem    *defel = (DefElem *) lfirst(lc);
     130             : 
     131             :         Assert(defel->arg == NULL || IsA(defel->arg, String));
     132             : 
     133             :         /* Check each param, whether or not we recognize it */
     134          80 :         if (strcmp(defel->defname, "proto_version") == 0)
     135             :         {
     136             :             int64       parsed;
     137             : 
     138          40 :             if (protocol_version_given)
     139           0 :                 ereport(ERROR,
     140             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     141             :                          errmsg("conflicting or redundant options")));
     142          40 :             protocol_version_given = true;
     143             : 
     144          40 :             if (!scanint8(strVal(defel->arg), true, &parsed))
     145           0 :                 ereport(ERROR,
     146             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     147             :                          errmsg("invalid proto_version")));
     148             : 
     149          40 :             if (parsed > PG_UINT32_MAX || parsed < 0)
     150           0 :                 ereport(ERROR,
     151             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     152             :                          errmsg("proto_version \"%s\" out of range",
     153             :                                 strVal(defel->arg))));
     154             : 
     155          40 :             *protocol_version = (uint32) parsed;
     156             :         }
     157          40 :         else if (strcmp(defel->defname, "publication_names") == 0)
     158             :         {
     159          40 :             if (publication_names_given)
     160           0 :                 ereport(ERROR,
     161             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     162             :                          errmsg("conflicting or redundant options")));
     163          40 :             publication_names_given = true;
     164             : 
     165          40 :             if (!SplitIdentifierString(strVal(defel->arg), ',',
     166             :                                        publication_names))
     167           0 :                 ereport(ERROR,
     168             :                         (errcode(ERRCODE_INVALID_NAME),
     169             :                          errmsg("invalid publication_names syntax")));
     170             :         }
     171             :         else
     172           0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     173             :     }
     174          40 : }
     175             : 
     176             : /*
     177             :  * Initialize this plugin
     178             :  */
     179             : static void
     180         190 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     181             :                  bool is_init)
     182             : {
     183         190 :     PGOutputData *data = palloc0(sizeof(PGOutputData));
     184             : 
     185             :     /* Create our memory context for private allocations. */
     186         190 :     data->context = AllocSetContextCreate(ctx->context,
     187             :                                           "logical replication output context",
     188             :                                           ALLOCSET_DEFAULT_SIZES);
     189             : 
     190         190 :     ctx->output_plugin_private = data;
     191             : 
     192             :     /* This plugin uses binary protocol. */
     193         190 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     194             : 
     195             :     /*
     196             :      * This is replication start and not slot initialization.
     197             :      *
     198             :      * Parse and validate options passed by the client.
     199             :      */
     200         190 :     if (!is_init)
     201             :     {
     202             :         /* Parse the params and ERROR if we see any we don't recognize */
     203          40 :         parse_output_parameters(ctx->output_plugin_options,
     204             :                                 &data->protocol_version,
     205             :                                 &data->publication_names);
     206             : 
     207             :         /* Check if we support requested protocol */
     208          40 :         if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
     209           0 :             ereport(ERROR,
     210             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     211             :                      errmsg("client sent proto_version=%d but we only support protocol %d or lower",
     212             :                             data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
     213             : 
     214          40 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     215           0 :             ereport(ERROR,
     216             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     217             :                      errmsg("client sent proto_version=%d but we only support protocol %d or higher",
     218             :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     219             : 
     220          40 :         if (list_length(data->publication_names) < 1)
     221           0 :             ereport(ERROR,
     222             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     223             :                      errmsg("publication_names parameter missing")));
     224             : 
     225             :         /* Init publication state. */
     226          40 :         data->publications = NIL;
     227          40 :         publications_valid = false;
     228          40 :         CacheRegisterSyscacheCallback(PUBLICATIONOID,
     229             :                                       publication_invalidation_cb,
     230             :                                       (Datum) 0);
     231             : 
     232             :         /* Initialize relation schema cache. */
     233          40 :         init_rel_sync_cache(CacheMemoryContext);
     234             :     }
     235         190 : }
     236             : 
     237             : /*
     238             :  * BEGIN callback
     239             :  */
     240             : static void
     241         308 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     242             : {
     243         308 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     244             : 
     245         308 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     246         308 :     logicalrep_write_begin(ctx->out, txn);
     247             : 
     248         308 :     if (send_replication_origin)
     249             :     {
     250             :         char       *origin;
     251             : 
     252             :         /* Message boundary */
     253           0 :         OutputPluginWrite(ctx, false);
     254           0 :         OutputPluginPrepareWrite(ctx, true);
     255             : 
     256             :         /*----------
     257             :          * XXX: which behaviour do we want here?
     258             :          *
     259             :          * Alternatives:
     260             :          *  - don't send origin message if origin name not found
     261             :          *    (that's what we do now)
     262             :          *  - throw error - that will break replication, not good
     263             :          *  - send some special "unknown" origin
     264             :          *----------
     265             :          */
     266           0 :         if (replorigin_by_oid(txn->origin_id, true, &origin))
     267           0 :             logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
     268             :     }
     269             : 
     270         308 :     OutputPluginWrite(ctx, true);
     271         308 : }
     272             : 
     273             : /*
     274             :  * COMMIT callback
     275             :  */
     276             : static void
     277         308 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     278             :                     XLogRecPtr commit_lsn)
     279             : {
     280         308 :     OutputPluginUpdateProgress(ctx);
     281             : 
     282         308 :     OutputPluginPrepareWrite(ctx, true);
     283         308 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
     284         308 :     OutputPluginWrite(ctx, true);
     285         308 : }
     286             : 
     287             : /*
     288             :  * Write the current schema of the relation and its ancestor (if any) if not
     289             :  * done yet.
     290             :  */
     291             : static void
     292        1424 : maybe_send_schema(LogicalDecodingContext *ctx,
     293             :                   Relation relation, RelationSyncEntry *relentry)
     294             : {
     295        1424 :     if (relentry->schema_sent)
     296        1326 :         return;
     297             : 
     298             :     /* If needed, send the ancestor's schema first. */
     299          98 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
     300             :     {
     301           0 :         Relation    ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     302           0 :         TupleDesc   indesc = RelationGetDescr(relation);
     303           0 :         TupleDesc   outdesc = RelationGetDescr(ancestor);
     304             :         MemoryContext oldctx;
     305             : 
     306             :         /* Map must live as long as the session does. */
     307           0 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     308           0 :         relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
     309             :                                                CreateTupleDescCopy(outdesc));
     310           0 :         MemoryContextSwitchTo(oldctx);
     311           0 :         send_relation_and_attrs(ancestor, ctx);
     312           0 :         RelationClose(ancestor);
     313             :     }
     314             : 
     315          98 :     send_relation_and_attrs(relation, ctx);
     316          98 :     relentry->schema_sent = true;
     317             : }
     318             : 
     319             : /*
     320             :  * Sends a relation
     321             :  */
     322             : static void
     323          98 : send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
     324             : {
     325          98 :     TupleDesc   desc = RelationGetDescr(relation);
     326             :     int         i;
     327             : 
     328             :     /*
     329             :      * Write out type info if needed.  We do that only for user-created types.
     330             :      * We use FirstGenbkiObjectId as the cutoff, so that we only consider
     331             :      * objects with hand-assigned OIDs to be "built in", not for instance any
     332             :      * function or type defined in the information_schema. This is important
     333             :      * because only hand-assigned OIDs can be expected to remain stable across
     334             :      * major versions.
     335             :      */
     336         276 :     for (i = 0; i < desc->natts; i++)
     337             :     {
     338         178 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     339             : 
     340         178 :         if (att->attisdropped || att->attgenerated)
     341           2 :             continue;
     342             : 
     343         176 :         if (att->atttypid < FirstGenbkiObjectId)
     344         144 :             continue;
     345             : 
     346          32 :         OutputPluginPrepareWrite(ctx, false);
     347          32 :         logicalrep_write_typ(ctx->out, att->atttypid);
     348          32 :         OutputPluginWrite(ctx, false);
     349             :     }
     350             : 
     351          98 :     OutputPluginPrepareWrite(ctx, false);
     352          98 :     logicalrep_write_rel(ctx->out, relation);
     353          98 :     OutputPluginWrite(ctx, false);
     354          98 : }
     355             : 
     356             : /*
     357             :  * Sends the decoded DML over wire.
     358             :  */
     359             : static void
     360        3602 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     361             :                 Relation relation, ReorderBufferChange *change)
     362             : {
     363        3602 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     364             :     MemoryContext old;
     365             :     RelationSyncEntry *relentry;
     366             : 
     367        3602 :     if (!is_publishable_relation(relation))
     368           4 :         return;
     369             : 
     370        3598 :     relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
     371             : 
     372             :     /* First check the table filter */
     373        3598 :     switch (change->action)
     374             :     {
     375         782 :         case REORDER_BUFFER_CHANGE_INSERT:
     376         782 :             if (!relentry->pubactions.pubinsert)
     377           0 :                 return;
     378         782 :             break;
     379         318 :         case REORDER_BUFFER_CHANGE_UPDATE:
     380         318 :             if (!relentry->pubactions.pubupdate)
     381          80 :                 return;
     382         238 :             break;
     383        2498 :         case REORDER_BUFFER_CHANGE_DELETE:
     384        2498 :             if (!relentry->pubactions.pubdelete)
     385        2104 :                 return;
     386         394 :             break;
     387        1414 :         default:
     388             :             Assert(false);
     389             :     }
     390             : 
     391             :     /* Avoid leaking memory by using and resetting our own context */
     392        1414 :     old = MemoryContextSwitchTo(data->context);
     393             : 
     394        1414 :     maybe_send_schema(ctx, relation, relentry);
     395             : 
     396             :     /* Send the data */
     397        1414 :     switch (change->action)
     398             :     {
     399         782 :         case REORDER_BUFFER_CHANGE_INSERT:
     400             :             {
     401         782 :                 HeapTuple   tuple = &change->data.tp.newtuple->tuple;
     402             : 
     403             :                 /* Switch relation if publishing via root. */
     404         782 :                 if (relentry->publish_as_relid != RelationGetRelid(relation))
     405             :                 {
     406             :                     Assert(relation->rd_rel->relispartition);
     407           0 :                     relation = RelationIdGetRelation(relentry->publish_as_relid);
     408             :                     /* Convert tuple if needed. */
     409           0 :                     if (relentry->map)
     410           0 :                         tuple = execute_attr_map_tuple(tuple, relentry->map);
     411             :                 }
     412             : 
     413         782 :                 OutputPluginPrepareWrite(ctx, true);
     414         782 :                 logicalrep_write_insert(ctx->out, relation, tuple);
     415         782 :                 OutputPluginWrite(ctx, true);
     416         782 :                 break;
     417             :             }
     418         238 :         case REORDER_BUFFER_CHANGE_UPDATE:
     419             :             {
     420         476 :                 HeapTuple   oldtuple = change->data.tp.oldtuple ?
     421         238 :                 &change->data.tp.oldtuple->tuple : NULL;
     422         238 :                 HeapTuple   newtuple = &change->data.tp.newtuple->tuple;
     423             : 
     424             :                 /* Switch relation if publishing via root. */
     425         238 :                 if (relentry->publish_as_relid != RelationGetRelid(relation))
     426             :                 {
     427             :                     Assert(relation->rd_rel->relispartition);
     428           0 :                     relation = RelationIdGetRelation(relentry->publish_as_relid);
     429             :                     /* Convert tuples if needed. */
     430           0 :                     if (relentry->map)
     431             :                     {
     432           0 :                         oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
     433           0 :                         newtuple = execute_attr_map_tuple(newtuple, relentry->map);
     434             :                     }
     435             :                 }
     436             : 
     437         238 :                 OutputPluginPrepareWrite(ctx, true);
     438         238 :                 logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
     439         238 :                 OutputPluginWrite(ctx, true);
     440         238 :                 break;
     441             :             }
     442         394 :         case REORDER_BUFFER_CHANGE_DELETE:
     443         394 :             if (change->data.tp.oldtuple)
     444             :             {
     445         394 :                 HeapTuple   oldtuple = &change->data.tp.oldtuple->tuple;
     446             : 
     447             :                 /* Switch relation if publishing via root. */
     448         394 :                 if (relentry->publish_as_relid != RelationGetRelid(relation))
     449             :                 {
     450             :                     Assert(relation->rd_rel->relispartition);
     451           0 :                     relation = RelationIdGetRelation(relentry->publish_as_relid);
     452             :                     /* Convert tuple if needed. */
     453           0 :                     if (relentry->map)
     454           0 :                         oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
     455             :                 }
     456             : 
     457         394 :                 OutputPluginPrepareWrite(ctx, true);
     458         394 :                 logicalrep_write_delete(ctx->out, relation, oldtuple);
     459         394 :                 OutputPluginWrite(ctx, true);
     460             :             }
     461             :             else
     462           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
     463         394 :             break;
     464        1414 :         default:
     465             :             Assert(false);
     466             :     }
     467             : 
     468             :     /* Cleanup */
     469        1414 :     MemoryContextSwitchTo(old);
     470        1414 :     MemoryContextReset(data->context);
     471             : }
     472             : 
     473             : static void
     474          14 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     475             :                   int nrelations, Relation relations[], ReorderBufferChange *change)
     476             : {
     477          14 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     478             :     MemoryContext old;
     479             :     RelationSyncEntry *relentry;
     480             :     int         i;
     481             :     int         nrelids;
     482             :     Oid        *relids;
     483             : 
     484          14 :     old = MemoryContextSwitchTo(data->context);
     485             : 
     486          14 :     relids = palloc0(nrelations * sizeof(Oid));
     487          14 :     nrelids = 0;
     488             : 
     489          36 :     for (i = 0; i < nrelations; i++)
     490             :     {
     491          22 :         Relation    relation = relations[i];
     492          22 :         Oid         relid = RelationGetRelid(relation);
     493             : 
     494          22 :         if (!is_publishable_relation(relation))
     495           0 :             continue;
     496             : 
     497          22 :         relentry = get_rel_sync_entry(data, relid);
     498             : 
     499          22 :         if (!relentry->pubactions.pubtruncate)
     500          12 :             continue;
     501             : 
     502             :         /*
     503             :          * Don't send partitions if the publication wants to send only the
     504             :          * root tables through it.
     505             :          */
     506          10 :         if (relation->rd_rel->relispartition &&
     507           8 :             relentry->publish_as_relid != relid)
     508           0 :             continue;
     509             : 
     510          10 :         relids[nrelids++] = relid;
     511          10 :         maybe_send_schema(ctx, relation, relentry);
     512             :     }
     513             : 
     514          14 :     if (nrelids > 0)
     515             :     {
     516           6 :         OutputPluginPrepareWrite(ctx, true);
     517          12 :         logicalrep_write_truncate(ctx->out,
     518             :                                   nrelids,
     519             :                                   relids,
     520           6 :                                   change->data.truncate.cascade,
     521           6 :                                   change->data.truncate.restart_seqs);
     522           6 :         OutputPluginWrite(ctx, true);
     523             :     }
     524             : 
     525          14 :     MemoryContextSwitchTo(old);
     526          14 :     MemoryContextReset(data->context);
     527          14 : }
     528             : 
     529             : /*
     530             :  * Currently we always forward.
     531             :  */
     532             : static bool
     533       15748 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
     534             :                        RepOriginId origin_id)
     535             : {
     536       15748 :     return false;
     537             : }
     538             : 
     539             : /*
     540             :  * Shutdown the output plugin.
     541             :  *
     542             :  * Note, we don't need to clean the data->context as it's child context
     543             :  * of the ctx->context so it will be cleaned up by logical decoding machinery.
     544             :  */
     545             : static void
     546         150 : pgoutput_shutdown(LogicalDecodingContext *ctx)
     547             : {
     548         150 :     if (RelationSyncCache)
     549             :     {
     550           0 :         hash_destroy(RelationSyncCache);
     551           0 :         RelationSyncCache = NULL;
     552             :     }
     553         150 : }
     554             : 
     555             : /*
     556             :  * Load publications from the list of publication names.
     557             :  */
     558             : static List *
     559          32 : LoadPublications(List *pubnames)
     560             : {
     561          32 :     List       *result = NIL;
     562             :     ListCell   *lc;
     563             : 
     564          66 :     foreach(lc, pubnames)
     565             :     {
     566          34 :         char       *pubname = (char *) lfirst(lc);
     567          34 :         Publication *pub = GetPublicationByName(pubname, false);
     568             : 
     569          34 :         result = lappend(result, pub);
     570             :     }
     571             : 
     572          32 :     return result;
     573             : }
     574             : 
     575             : /*
     576             :  * Publication cache invalidation callback.
     577             :  */
     578             : static void
     579          38 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
     580             : {
     581          38 :     publications_valid = false;
     582             : 
     583             :     /*
     584             :      * Also invalidate per-relation cache so that next time the filtering info
     585             :      * is checked it will be updated with the new publication settings.
     586             :      */
     587          38 :     rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
     588          38 : }
     589             : 
     590             : /*
     591             :  * Initialize the relation schema sync cache for a decoding session.
     592             :  *
     593             :  * The hash table is destroyed at the end of a decoding session. While
     594             :  * relcache invalidations still exist and will still be invoked, they
     595             :  * will just see the null hash table global and take no action.
     596             :  */
     597             : static void
     598          40 : init_rel_sync_cache(MemoryContext cachectx)
     599             : {
     600             :     HASHCTL     ctl;
     601             :     MemoryContext old_ctxt;
     602             : 
     603          40 :     if (RelationSyncCache != NULL)
     604           0 :         return;
     605             : 
     606             :     /* Make a new hash table for the cache */
     607         560 :     MemSet(&ctl, 0, sizeof(ctl));
     608          40 :     ctl.keysize = sizeof(Oid);
     609          40 :     ctl.entrysize = sizeof(RelationSyncEntry);
     610          40 :     ctl.hcxt = cachectx;
     611             : 
     612          40 :     old_ctxt = MemoryContextSwitchTo(cachectx);
     613          40 :     RelationSyncCache = hash_create("logical replication output relation cache",
     614             :                                     128, &ctl,
     615             :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
     616          40 :     (void) MemoryContextSwitchTo(old_ctxt);
     617             : 
     618             :     Assert(RelationSyncCache != NULL);
     619             : 
     620          40 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
     621          40 :     CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
     622             :                                   rel_sync_cache_publication_cb,
     623             :                                   (Datum) 0);
     624             : }
     625             : 
     626             : /*
     627             :  * Find or create entry in the relation schema cache.
     628             :  *
     629             :  * This looks up publications that the given relation is directly or
     630             :  * indirectly part of (the latter if it's really the relation's ancestor that
     631             :  * is part of a publication) and fills up the found entry with the information
     632             :  * about which operations to publish and whether to use an ancestor's schema
     633             :  * when publishing.
     634             :  */
     635             : static RelationSyncEntry *
     636        3620 : get_rel_sync_entry(PGOutputData *data, Oid relid)
     637             : {
     638             :     RelationSyncEntry *entry;
     639        3620 :     bool        am_partition = get_rel_relispartition(relid);
     640        3620 :     char        relkind = get_rel_relkind(relid);
     641             :     bool        found;
     642             :     MemoryContext oldctx;
     643             : 
     644             :     Assert(RelationSyncCache != NULL);
     645             : 
     646             :     /* Find cached function info, creating if not found */
     647        3620 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     648        3620 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
     649             :                                               (void *) &relid,
     650             :                                               HASH_ENTER, &found);
     651        3620 :     MemoryContextSwitchTo(oldctx);
     652             :     Assert(entry != NULL);
     653             : 
     654             :     /* Not found means schema wasn't sent */
     655        3620 :     if (!found || !entry->replicate_valid)
     656             :     {
     657          94 :         List       *pubids = GetRelationPublications(relid);
     658             :         ListCell   *lc;
     659          94 :         Oid         publish_as_relid = relid;
     660             : 
     661             :         /* Reload publications if needed before use. */
     662          94 :         if (!publications_valid)
     663             :         {
     664          32 :             oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     665          32 :             if (data->publications)
     666           4 :                 list_free_deep(data->publications);
     667             : 
     668          32 :             data->publications = LoadPublications(data->publication_names);
     669          32 :             MemoryContextSwitchTo(oldctx);
     670          32 :             publications_valid = true;
     671             :         }
     672             : 
     673             :         /*
     674             :          * Build publication cache. We can't use one provided by relcache as
     675             :          * relcache considers all publications given relation is in, but here
     676             :          * we only need to consider ones that the subscriber requested.
     677             :          */
     678          94 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
     679          94 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
     680             : 
     681         120 :         foreach(lc, data->publications)
     682             :         {
     683          96 :             Publication *pub = lfirst(lc);
     684          96 :             bool        publish = false;
     685             : 
     686          96 :             if (pub->alltables)
     687             :             {
     688          50 :                 publish = true;
     689          50 :                 if (pub->pubviaroot && am_partition)
     690           0 :                     publish_as_relid = llast_oid(get_partition_ancestors(relid));
     691             :             }
     692             : 
     693          96 :             if (!publish)
     694             :             {
     695          46 :                 bool        ancestor_published = false;
     696             : 
     697             :                 /*
     698             :                  * For a partition, check if any of the ancestors are
     699             :                  * published.  If so, note down the topmost ancestor that is
     700             :                  * published via this publication, which will be used as the
     701             :                  * relation via which to publish the partition's changes.
     702             :                  */
     703          46 :                 if (am_partition)
     704             :                 {
     705           6 :                     List       *ancestors = get_partition_ancestors(relid);
     706             :                     ListCell   *lc2;
     707             : 
     708             :                     /*
     709             :                      * Find the "topmost" ancestor that is in this
     710             :                      * publication.
     711             :                      */
     712          12 :                     foreach(lc2, ancestors)
     713             :                     {
     714           6 :                         Oid         ancestor = lfirst_oid(lc2);
     715             : 
     716           6 :                         if (list_member_oid(GetRelationPublications(ancestor),
     717             :                                             pub->oid))
     718             :                         {
     719           6 :                             ancestor_published = true;
     720           6 :                             if (pub->pubviaroot)
     721           0 :                                 publish_as_relid = ancestor;
     722             :                         }
     723             :                     }
     724             :                 }
     725             : 
     726          46 :                 if (list_member_oid(pubids, pub->oid) || ancestor_published)
     727          34 :                     publish = true;
     728             :             }
     729             : 
     730             :             /*
     731             :              * Don't publish changes for partitioned tables, because
     732             :              * publishing those of its partitions suffices, unless partition
     733             :              * changes won't be published due to pubviaroot being set.
     734             :              */
     735          96 :             if (publish &&
     736           2 :                 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
     737             :             {
     738          82 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
     739          82 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
     740          82 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
     741          82 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
     742             :             }
     743             : 
     744          96 :             if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
     745          70 :                 entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
     746          70 :                 break;
     747             :         }
     748             : 
     749          94 :         list_free(pubids);
     750             : 
     751          94 :         entry->publish_as_relid = publish_as_relid;
     752          94 :         entry->replicate_valid = true;
     753             :     }
     754             : 
     755        3620 :     if (!found)
     756          92 :         entry->schema_sent = false;
     757             : 
     758        3620 :     return entry;
     759             : }
     760             : 
     761             : /*
     762             :  * Relcache invalidation callback
     763             :  */
     764             : static void
     765        2326 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
     766             : {
     767             :     RelationSyncEntry *entry;
     768             : 
     769             :     /*
     770             :      * We can get here if the plugin was used in SQL interface as the
     771             :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
     772             :      * is no way to unregister the relcache invalidation callback.
     773             :      */
     774        2326 :     if (RelationSyncCache == NULL)
     775           0 :         return;
     776             : 
     777             :     /*
     778             :      * Nobody keeps pointers to entries in this hash table around outside
     779             :      * logical decoding callback calls - but invalidation events can come in
     780             :      * *during* a callback if we access the relcache in the callback. Because
     781             :      * of that we must mark the cache entry as invalid but not remove it from
     782             :      * the hash while it could still be referenced, then prune it at a later
     783             :      * safe point.
     784             :      *
     785             :      * Getting invalidations for relations that aren't in the table is
     786             :      * entirely normal, since there's no way to unregister for an invalidation
     787             :      * event. So we don't care if it's found or not.
     788             :      */
     789        2326 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
     790             :                                               HASH_FIND, NULL);
     791             : 
     792             :     /*
     793             :      * Reset schema sent status as the relation definition may have changed.
     794             :      */
     795        2326 :     if (entry != NULL)
     796         470 :         entry->schema_sent = false;
     797             : }
     798             : 
     799             : /*
     800             :  * Publication relation map syscache invalidation callback
     801             :  */
     802             : static void
     803          82 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
     804             : {
     805             :     HASH_SEQ_STATUS status;
     806             :     RelationSyncEntry *entry;
     807             : 
     808             :     /*
     809             :      * We can get here if the plugin was used in SQL interface as the
     810             :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
     811             :      * is no way to unregister the relcache invalidation callback.
     812             :      */
     813          82 :     if (RelationSyncCache == NULL)
     814           0 :         return;
     815             : 
     816             :     /*
     817             :      * There is no way to find which entry in our cache the hash belongs to so
     818             :      * mark the whole cache as invalid.
     819             :      */
     820          82 :     hash_seq_init(&status, RelationSyncCache);
     821         346 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
     822         264 :         entry->replicate_valid = false;
     823             : }

Generated by: LCOV version 1.13