LCOV - code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 199 219 90.9 %
Date: 2019-09-19 17:07:13 Functions: 17 17 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pgoutput.c
       4             :  *      Logical Replication output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/backend/replication/pgoutput/pgoutput.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "catalog/pg_publication.h"
      16             : 
      17             : #include "fmgr.h"
      18             : 
      19             : #include "replication/logical.h"
      20             : #include "replication/logicalproto.h"
      21             : #include "replication/origin.h"
      22             : #include "replication/pgoutput.h"
      23             : 
      24             : #include "utils/inval.h"
      25             : #include "utils/int8.h"
      26             : #include "utils/memutils.h"
      27             : #include "utils/syscache.h"
      28             : #include "utils/varlena.h"
      29             : 
      30         146 : PG_MODULE_MAGIC;
      31             : 
      32             : extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
      33             : 
      34             : static void pgoutput_startup(LogicalDecodingContext *ctx,
      35             :                              OutputPluginOptions *opt, bool is_init);
      36             : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      37             : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      38             :                                ReorderBufferTXN *txn);
      39             : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      40             :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      41             : static void pgoutput_change(LogicalDecodingContext *ctx,
      42             :                             ReorderBufferTXN *txn, Relation rel,
      43             :                             ReorderBufferChange *change);
      44             : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      45             :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
      46             :                               ReorderBufferChange *change);
      47             : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      48             :                                    RepOriginId origin_id);
      49             : 
      50             : static bool publications_valid;
      51             : 
      52             : static List *LoadPublications(List *pubnames);
      53             : static void publication_invalidation_cb(Datum arg, int cacheid,
      54             :                                         uint32 hashvalue);
      55             : 
      56             : /* Entry in the map used to remember which relation schemas we sent. */
      57             : typedef struct RelationSyncEntry
      58             : {
      59             :     Oid         relid;          /* relation oid */
      60             :     bool        schema_sent;    /* did we send the schema? */
      61             :     bool        replicate_valid;
      62             :     PublicationActions pubactions;
      63             : } RelationSyncEntry;
      64             : 
      65             : /* Map used to remember which relation schemas we sent. */
      66             : static HTAB *RelationSyncCache = NULL;
      67             : 
      68             : static void init_rel_sync_cache(MemoryContext decoding_context);
      69             : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
      70             : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
      71             : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
      72             :                                           uint32 hashvalue);
      73             : 
      74             : /*
      75             :  * Specify output plugin callbacks
      76             :  */
      77             : void
      78         154 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
      79             : {
      80             :     AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
      81             : 
      82         154 :     cb->startup_cb = pgoutput_startup;
      83         154 :     cb->begin_cb = pgoutput_begin_txn;
      84         154 :     cb->change_cb = pgoutput_change;
      85         154 :     cb->truncate_cb = pgoutput_truncate;
      86         154 :     cb->commit_cb = pgoutput_commit_txn;
      87         154 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
      88         154 :     cb->shutdown_cb = pgoutput_shutdown;
      89         154 : }
      90             : 
      91             : static void
      92          40 : parse_output_parameters(List *options, uint32 *protocol_version,
      93             :                         List **publication_names)
      94             : {
      95             :     ListCell   *lc;
      96          40 :     bool        protocol_version_given = false;
      97          40 :     bool        publication_names_given = false;
      98             : 
      99         120 :     foreach(lc, options)
     100             :     {
     101          80 :         DefElem    *defel = (DefElem *) lfirst(lc);
     102             : 
     103             :         Assert(defel->arg == NULL || IsA(defel->arg, String));
     104             : 
     105             :         /* Check each param, whether or not we recognize it */
     106          80 :         if (strcmp(defel->defname, "proto_version") == 0)
     107             :         {
     108             :             int64       parsed;
     109             : 
     110          40 :             if (protocol_version_given)
     111           0 :                 ereport(ERROR,
     112             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     113             :                          errmsg("conflicting or redundant options")));
     114          40 :             protocol_version_given = true;
     115             : 
     116          40 :             if (!scanint8(strVal(defel->arg), true, &parsed))
     117           0 :                 ereport(ERROR,
     118             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     119             :                          errmsg("invalid proto_version")));
     120             : 
     121          40 :             if (parsed > PG_UINT32_MAX || parsed < 0)
     122           0 :                 ereport(ERROR,
     123             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     124             :                          errmsg("proto_version \"%s\" out of range",
     125             :                                 strVal(defel->arg))));
     126             : 
     127          40 :             *protocol_version = (uint32) parsed;
     128             :         }
     129          40 :         else if (strcmp(defel->defname, "publication_names") == 0)
     130             :         {
     131          40 :             if (publication_names_given)
     132           0 :                 ereport(ERROR,
     133             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     134             :                          errmsg("conflicting or redundant options")));
     135          40 :             publication_names_given = true;
     136             : 
     137          40 :             if (!SplitIdentifierString(strVal(defel->arg), ',',
     138             :                                        publication_names))
     139           0 :                 ereport(ERROR,
     140             :                         (errcode(ERRCODE_INVALID_NAME),
     141             :                          errmsg("invalid publication_names syntax")));
     142             :         }
     143             :         else
     144           0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     145             :     }
     146          40 : }
     147             : 
     148             : /*
     149             :  * Initialize this plugin
     150             :  */
     151             : static void
     152         154 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     153             :                  bool is_init)
     154             : {
     155         154 :     PGOutputData *data = palloc0(sizeof(PGOutputData));
     156             : 
     157             :     /* Create our memory context for private allocations. */
     158         154 :     data->context = AllocSetContextCreate(ctx->context,
     159             :                                           "logical replication output context",
     160             :                                           ALLOCSET_DEFAULT_SIZES);
     161             : 
     162         154 :     ctx->output_plugin_private = data;
     163             : 
     164             :     /* This plugin uses binary protocol. */
     165         154 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     166             : 
     167             :     /*
     168             :      * This is replication start and not slot initialization.
     169             :      *
     170             :      * Parse and validate options passed by the client.
     171             :      */
     172         154 :     if (!is_init)
     173             :     {
     174             :         /* Parse the params and ERROR if we see any we don't recognize */
     175          40 :         parse_output_parameters(ctx->output_plugin_options,
     176             :                                 &data->protocol_version,
     177             :                                 &data->publication_names);
     178             : 
     179             :         /* Check if we support requested protocol */
     180          40 :         if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
     181           0 :             ereport(ERROR,
     182             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     183             :                      errmsg("client sent proto_version=%d but we only support protocol %d or lower",
     184             :                             data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
     185             : 
     186          40 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     187           0 :             ereport(ERROR,
     188             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     189             :                      errmsg("client sent proto_version=%d but we only support protocol %d or higher",
     190             :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     191             : 
     192          40 :         if (list_length(data->publication_names) < 1)
     193           0 :             ereport(ERROR,
     194             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     195             :                      errmsg("publication_names parameter missing")));
     196             : 
     197             :         /* Init publication state. */
     198          40 :         data->publications = NIL;
     199          40 :         publications_valid = false;
     200          40 :         CacheRegisterSyscacheCallback(PUBLICATIONOID,
     201             :                                       publication_invalidation_cb,
     202             :                                       (Datum) 0);
     203             : 
     204             :         /* Initialize relation schema cache. */
     205          40 :         init_rel_sync_cache(CacheMemoryContext);
     206             :     }
     207         154 : }
     208             : 
     209             : /*
     210             :  * BEGIN callback
     211             :  */
     212             : static void
     213         234 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     214             : {
     215         234 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     216             : 
     217         234 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     218         234 :     logicalrep_write_begin(ctx->out, txn);
     219             : 
     220         234 :     if (send_replication_origin)
     221             :     {
     222             :         char       *origin;
     223             : 
     224             :         /* Message boundary */
     225           0 :         OutputPluginWrite(ctx, false);
     226           0 :         OutputPluginPrepareWrite(ctx, true);
     227             : 
     228             :         /*----------
     229             :          * XXX: which behaviour do we want here?
     230             :          *
     231             :          * Alternatives:
     232             :          *  - don't send origin message if origin name not found
     233             :          *    (that's what we do now)
     234             :          *  - throw error - that will break replication, not good
     235             :          *  - send some special "unknown" origin
     236             :          *----------
     237             :          */
     238           0 :         if (replorigin_by_oid(txn->origin_id, true, &origin))
     239           0 :             logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
     240             :     }
     241             : 
     242         234 :     OutputPluginWrite(ctx, true);
     243         234 : }
     244             : 
     245             : /*
     246             :  * COMMIT callback
     247             :  */
     248             : static void
     249         234 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     250             :                     XLogRecPtr commit_lsn)
     251             : {
     252         234 :     OutputPluginUpdateProgress(ctx);
     253             : 
     254         234 :     OutputPluginPrepareWrite(ctx, true);
     255         234 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
     256         234 :     OutputPluginWrite(ctx, true);
     257         234 : }
     258             : 
     259             : /*
     260             :  * Write the relation schema if the current schema hasn't been sent yet.
     261             :  */
     262             : static void
     263        1322 : maybe_send_schema(LogicalDecodingContext *ctx,
     264             :                   Relation relation, RelationSyncEntry *relentry)
     265             : {
     266        1322 :     if (!relentry->schema_sent)
     267             :     {
     268             :         TupleDesc   desc;
     269             :         int         i;
     270             : 
     271          76 :         desc = RelationGetDescr(relation);
     272             : 
     273             :         /*
     274             :          * Write out type info if needed.  We do that only for user-created
     275             :          * types.  We use FirstGenbkiObjectId as the cutoff, so that we only
     276             :          * consider objects with hand-assigned OIDs to be "built in", not for
     277             :          * instance any function or type defined in the information_schema.
     278             :          * This is important because only hand-assigned OIDs can be expected
     279             :          * to remain stable across major versions.
     280             :          */
     281         210 :         for (i = 0; i < desc->natts; i++)
     282             :         {
     283         134 :             Form_pg_attribute att = TupleDescAttr(desc, i);
     284             : 
     285         134 :             if (att->attisdropped || att->attgenerated)
     286           0 :                 continue;
     287             : 
     288         134 :             if (att->atttypid < FirstGenbkiObjectId)
     289         102 :                 continue;
     290             : 
     291          32 :             OutputPluginPrepareWrite(ctx, false);
     292          32 :             logicalrep_write_typ(ctx->out, att->atttypid);
     293          32 :             OutputPluginWrite(ctx, false);
     294             :         }
     295             : 
     296          76 :         OutputPluginPrepareWrite(ctx, false);
     297          76 :         logicalrep_write_rel(ctx->out, relation);
     298          76 :         OutputPluginWrite(ctx, false);
     299          76 :         relentry->schema_sent = true;
     300             :     }
     301        1322 : }
     302             : 
     303             : /*
     304             :  * Sends the decoded DML over wire.
     305             :  */
     306             : static void
     307        3508 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     308             :                 Relation relation, ReorderBufferChange *change)
     309             : {
     310        3508 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     311             :     MemoryContext old;
     312             :     RelationSyncEntry *relentry;
     313             : 
     314        3508 :     if (!is_publishable_relation(relation))
     315           4 :         return;
     316             : 
     317        3504 :     relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
     318             : 
     319             :     /* First check the table filter */
     320        3504 :     switch (change->action)
     321             :     {
     322             :         case REORDER_BUFFER_CHANGE_INSERT:
     323         726 :             if (!relentry->pubactions.pubinsert)
     324           0 :                 return;
     325         726 :             break;
     326             :         case REORDER_BUFFER_CHANGE_UPDATE:
     327         292 :             if (!relentry->pubactions.pubupdate)
     328          80 :                 return;
     329         212 :             break;
     330             :         case REORDER_BUFFER_CHANGE_DELETE:
     331        2486 :             if (!relentry->pubactions.pubdelete)
     332        2104 :                 return;
     333         382 :             break;
     334             :         default:
     335             :             Assert(false);
     336             :     }
     337             : 
     338             :     /* Avoid leaking memory by using and resetting our own context */
     339        1320 :     old = MemoryContextSwitchTo(data->context);
     340             : 
     341        1320 :     maybe_send_schema(ctx, relation, relentry);
     342             : 
     343             :     /* Send the data */
     344        1320 :     switch (change->action)
     345             :     {
     346             :         case REORDER_BUFFER_CHANGE_INSERT:
     347         726 :             OutputPluginPrepareWrite(ctx, true);
     348         726 :             logicalrep_write_insert(ctx->out, relation,
     349         726 :                                     &change->data.tp.newtuple->tuple);
     350         726 :             OutputPluginWrite(ctx, true);
     351         726 :             break;
     352             :         case REORDER_BUFFER_CHANGE_UPDATE:
     353             :             {
     354         424 :                 HeapTuple   oldtuple = change->data.tp.oldtuple ?
     355         212 :                 &change->data.tp.oldtuple->tuple : NULL;
     356             : 
     357         212 :                 OutputPluginPrepareWrite(ctx, true);
     358         212 :                 logicalrep_write_update(ctx->out, relation, oldtuple,
     359         212 :                                         &change->data.tp.newtuple->tuple);
     360         212 :                 OutputPluginWrite(ctx, true);
     361         212 :                 break;
     362             :             }
     363             :         case REORDER_BUFFER_CHANGE_DELETE:
     364         382 :             if (change->data.tp.oldtuple)
     365             :             {
     366         382 :                 OutputPluginPrepareWrite(ctx, true);
     367         382 :                 logicalrep_write_delete(ctx->out, relation,
     368         382 :                                         &change->data.tp.oldtuple->tuple);
     369         382 :                 OutputPluginWrite(ctx, true);
     370             :             }
     371             :             else
     372           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
     373         382 :             break;
     374             :         default:
     375             :             Assert(false);
     376             :     }
     377             : 
     378             :     /* Cleanup */
     379        1320 :     MemoryContextSwitchTo(old);
     380        1320 :     MemoryContextReset(data->context);
     381             : }
     382             : 
     383             : static void
     384          10 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     385             :                   int nrelations, Relation relations[], ReorderBufferChange *change)
     386             : {
     387          10 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     388             :     MemoryContext old;
     389             :     RelationSyncEntry *relentry;
     390             :     int         i;
     391             :     int         nrelids;
     392             :     Oid        *relids;
     393             : 
     394          10 :     old = MemoryContextSwitchTo(data->context);
     395             : 
     396          10 :     relids = palloc0(nrelations * sizeof(Oid));
     397          10 :     nrelids = 0;
     398             : 
     399          22 :     for (i = 0; i < nrelations; i++)
     400             :     {
     401          12 :         Relation    relation = relations[i];
     402          12 :         Oid         relid = RelationGetRelid(relation);
     403             : 
     404          12 :         if (!is_publishable_relation(relation))
     405           0 :             continue;
     406             : 
     407          12 :         relentry = get_rel_sync_entry(data, relid);
     408             : 
     409          12 :         if (!relentry->pubactions.pubtruncate)
     410          10 :             continue;
     411             : 
     412           2 :         relids[nrelids++] = relid;
     413           2 :         maybe_send_schema(ctx, relation, relentry);
     414             :     }
     415             : 
     416          10 :     if (nrelids > 0)
     417             :     {
     418           2 :         OutputPluginPrepareWrite(ctx, true);
     419           4 :         logicalrep_write_truncate(ctx->out,
     420             :                                   nrelids,
     421             :                                   relids,
     422           2 :                                   change->data.truncate.cascade,
     423           2 :                                   change->data.truncate.restart_seqs);
     424           2 :         OutputPluginWrite(ctx, true);
     425             :     }
     426             : 
     427          10 :     MemoryContextSwitchTo(old);
     428          10 :     MemoryContextReset(data->context);
     429          10 : }
     430             : 
     431             : /*
     432             :  * Currently we always forward.
     433             :  */
     434             : static bool
     435       15210 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
     436             :                        RepOriginId origin_id)
     437             : {
     438       15210 :     return false;
     439             : }
     440             : 
     441             : /*
     442             :  * Shutdown the output plugin.
     443             :  *
     444             :  * Note, we don't need to clean the data->context as it's child context
     445             :  * of the ctx->context so it will be cleaned up by logical decoding machinery.
     446             :  */
     447             : static void
     448         116 : pgoutput_shutdown(LogicalDecodingContext *ctx)
     449             : {
     450         116 :     if (RelationSyncCache)
     451             :     {
     452           2 :         hash_destroy(RelationSyncCache);
     453           2 :         RelationSyncCache = NULL;
     454             :     }
     455         116 : }
     456             : 
     457             : /*
     458             :  * Load publications from the list of publication names.
     459             :  */
     460             : static List *
     461          30 : LoadPublications(List *pubnames)
     462             : {
     463          30 :     List       *result = NIL;
     464             :     ListCell   *lc;
     465             : 
     466          62 :     foreach(lc, pubnames)
     467             :     {
     468          32 :         char       *pubname = (char *) lfirst(lc);
     469          32 :         Publication *pub = GetPublicationByName(pubname, false);
     470             : 
     471          32 :         result = lappend(result, pub);
     472             :     }
     473             : 
     474          30 :     return result;
     475             : }
     476             : 
     477             : /*
     478             :  * Publication cache invalidation callback.
     479             :  */
     480             : static void
     481          14 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
     482             : {
     483          14 :     publications_valid = false;
     484             : 
     485             :     /*
     486             :      * Also invalidate per-relation cache so that next time the filtering info
     487             :      * is checked it will be updated with the new publication settings.
     488             :      */
     489          14 :     rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
     490          14 : }
     491             : 
     492             : /*
     493             :  * Initialize the relation schema sync cache for a decoding session.
     494             :  *
     495             :  * The hash table is destroyed at the end of a decoding session. While
     496             :  * relcache invalidations still exist and will still be invoked, they
     497             :  * will just see the null hash table global and take no action.
     498             :  */
     499             : static void
     500          40 : init_rel_sync_cache(MemoryContext cachectx)
     501             : {
     502             :     HASHCTL     ctl;
     503             :     MemoryContext old_ctxt;
     504             : 
     505          40 :     if (RelationSyncCache != NULL)
     506           0 :         return;
     507             : 
     508             :     /* Make a new hash table for the cache */
     509          40 :     MemSet(&ctl, 0, sizeof(ctl));
     510          40 :     ctl.keysize = sizeof(Oid);
     511          40 :     ctl.entrysize = sizeof(RelationSyncEntry);
     512          40 :     ctl.hcxt = cachectx;
     513             : 
     514          40 :     old_ctxt = MemoryContextSwitchTo(cachectx);
     515          40 :     RelationSyncCache = hash_create("logical replication output relation cache",
     516             :                                     128, &ctl,
     517             :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
     518          40 :     (void) MemoryContextSwitchTo(old_ctxt);
     519             : 
     520             :     Assert(RelationSyncCache != NULL);
     521             : 
     522          40 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
     523          40 :     CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
     524             :                                   rel_sync_cache_publication_cb,
     525             :                                   (Datum) 0);
     526             : }
     527             : 
     528             : /*
     529             :  * Find or create entry in the relation schema cache.
     530             :  */
     531             : static RelationSyncEntry *
     532        3516 : get_rel_sync_entry(PGOutputData *data, Oid relid)
     533             : {
     534             :     RelationSyncEntry *entry;
     535             :     bool        found;
     536             :     MemoryContext oldctx;
     537             : 
     538             :     Assert(RelationSyncCache != NULL);
     539             : 
     540             :     /* Find cached function info, creating if not found */
     541        3516 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     542        3516 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
     543             :                                               (void *) &relid,
     544             :                                               HASH_ENTER, &found);
     545        3516 :     MemoryContextSwitchTo(oldctx);
     546             :     Assert(entry != NULL);
     547             : 
     548             :     /* Not found means schema wasn't sent */
     549        3516 :     if (!found || !entry->replicate_valid)
     550             :     {
     551          80 :         List       *pubids = GetRelationPublications(relid);
     552             :         ListCell   *lc;
     553             : 
     554             :         /* Reload publications if needed before use. */
     555          80 :         if (!publications_valid)
     556             :         {
     557          30 :             oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     558          30 :             if (data->publications)
     559           4 :                 list_free_deep(data->publications);
     560             : 
     561          30 :             data->publications = LoadPublications(data->publication_names);
     562          30 :             MemoryContextSwitchTo(oldctx);
     563          30 :             publications_valid = true;
     564             :         }
     565             : 
     566             :         /*
     567             :          * Build publication cache. We can't use one provided by relcache as
     568             :          * relcache considers all publications given relation is in, but here
     569             :          * we only need to consider ones that the subscriber requested.
     570             :          */
     571          80 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
     572          80 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
     573             : 
     574         104 :         foreach(lc, data->publications)
     575             :         {
     576          82 :             Publication *pub = lfirst(lc);
     577             : 
     578          82 :             if (pub->alltables || list_member_oid(pubids, pub->oid))
     579             :             {
     580          70 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
     581          70 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
     582          70 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
     583          70 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
     584             :             }
     585             : 
     586         140 :             if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
     587         116 :                 entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
     588          58 :                 break;
     589             :         }
     590             : 
     591          80 :         list_free(pubids);
     592             : 
     593          80 :         entry->replicate_valid = true;
     594             :     }
     595             : 
     596        3516 :     if (!found)
     597          78 :         entry->schema_sent = false;
     598             : 
     599        3516 :     return entry;
     600             : }
     601             : 
     602             : /*
     603             :  * Relcache invalidation callback
     604             :  */
     605             : static void
     606         800 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
     607             : {
     608             :     RelationSyncEntry *entry;
     609             : 
     610             :     /*
     611             :      * We can get here if the plugin was used in SQL interface as the
     612             :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
     613             :      * is no way to unregister the relcache invalidation callback.
     614             :      */
     615         800 :     if (RelationSyncCache == NULL)
     616           0 :         return;
     617             : 
     618             :     /*
     619             :      * Nobody keeps pointers to entries in this hash table around outside
     620             :      * logical decoding callback calls - but invalidation events can come in
     621             :      * *during* a callback if we access the relcache in the callback. Because
     622             :      * of that we must mark the cache entry as invalid but not remove it from
     623             :      * the hash while it could still be referenced, then prune it at a later
     624             :      * safe point.
     625             :      *
     626             :      * Getting invalidations for relations that aren't in the table is
     627             :      * entirely normal, since there's no way to unregister for an invalidation
     628             :      * event. So we don't care if it's found or not.
     629             :      */
     630         800 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
     631             :                                               HASH_FIND, NULL);
     632             : 
     633             :     /*
     634             :      * Reset schema sent status as the relation definition may have changed.
     635             :      */
     636         800 :     if (entry != NULL)
     637         252 :         entry->schema_sent = false;
     638             : }
     639             : 
     640             : /*
     641             :  * Publication relation map syscache invalidation callback
     642             :  */
     643             : static void
     644          22 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
     645             : {
     646             :     HASH_SEQ_STATUS status;
     647             :     RelationSyncEntry *entry;
     648             : 
     649             :     /*
     650             :      * We can get here if the plugin was used in SQL interface as the
     651             :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
     652             :      * is no way to unregister the relcache invalidation callback.
     653             :      */
     654          22 :     if (RelationSyncCache == NULL)
     655           0 :         return;
     656             : 
     657             :     /*
     658             :      * There is no way to find which entry in our cache the hash belongs to so
     659             :      * mark the whole cache as invalid.
     660             :      */
     661          22 :     hash_seq_init(&status, RelationSyncCache);
     662          68 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
     663          24 :         entry->replicate_valid = false;
     664             : }

Generated by: LCOV version 1.13