LCOV - code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 197 219 90.0 %
Date: 2019-06-18 07:06:57 Functions: 17 17 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pgoutput.c
       4             :  *      Logical Replication output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/backend/replication/pgoutput/pgoutput.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "catalog/pg_publication.h"
      16             : 
      17             : #include "replication/logical.h"
      18             : #include "replication/logicalproto.h"
      19             : #include "replication/origin.h"
      20             : #include "replication/pgoutput.h"
      21             : 
      22             : #include "utils/inval.h"
      23             : #include "utils/int8.h"
      24             : #include "utils/memutils.h"
      25             : #include "utils/syscache.h"
      26             : #include "utils/varlena.h"
      27             : 
      28         148 : PG_MODULE_MAGIC;
      29             : 
      30             : extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
      31             : 
      32             : static void pgoutput_startup(LogicalDecodingContext *ctx,
      33             :                              OutputPluginOptions *opt, bool is_init);
      34             : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      35             : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      36             :                                ReorderBufferTXN *txn);
      37             : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      38             :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      39             : static void pgoutput_change(LogicalDecodingContext *ctx,
      40             :                             ReorderBufferTXN *txn, Relation rel,
      41             :                             ReorderBufferChange *change);
      42             : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      43             :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
      44             :                               ReorderBufferChange *change);
      45             : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      46             :                                    RepOriginId origin_id);
      47             : 
      48             : static bool publications_valid;
      49             : 
      50             : static List *LoadPublications(List *pubnames);
      51             : static void publication_invalidation_cb(Datum arg, int cacheid,
      52             :                                         uint32 hashvalue);
      53             : 
      54             : /* Entry in the map used to remember which relation schemas we sent. */
      55             : typedef struct RelationSyncEntry
      56             : {
      57             :     Oid         relid;          /* relation oid */
      58             :     bool        schema_sent;    /* did we send the schema? */
      59             :     bool        replicate_valid;
      60             :     PublicationActions pubactions;
      61             : } RelationSyncEntry;
      62             : 
      63             : /* Map used to remember which relation schemas we sent. */
      64             : static HTAB *RelationSyncCache = NULL;
      65             : 
      66             : static void init_rel_sync_cache(MemoryContext decoding_context);
      67             : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
      68             : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
      69             : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
      70             :                                           uint32 hashvalue);
      71             : 
      72             : /*
      73             :  * Specify output plugin callbacks
      74             :  */
      75             : void
      76         154 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
      77             : {
      78             :     AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
      79             : 
      80         154 :     cb->startup_cb = pgoutput_startup;
      81         154 :     cb->begin_cb = pgoutput_begin_txn;
      82         154 :     cb->change_cb = pgoutput_change;
      83         154 :     cb->truncate_cb = pgoutput_truncate;
      84         154 :     cb->commit_cb = pgoutput_commit_txn;
      85         154 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
      86         154 :     cb->shutdown_cb = pgoutput_shutdown;
      87         154 : }
      88             : 
      89             : static void
      90          38 : parse_output_parameters(List *options, uint32 *protocol_version,
      91             :                         List **publication_names)
      92             : {
      93             :     ListCell   *lc;
      94          38 :     bool        protocol_version_given = false;
      95          38 :     bool        publication_names_given = false;
      96             : 
      97         114 :     foreach(lc, options)
      98             :     {
      99          76 :         DefElem    *defel = (DefElem *) lfirst(lc);
     100             : 
     101             :         Assert(defel->arg == NULL || IsA(defel->arg, String));
     102             : 
     103             :         /* Check each param, whether or not we recognize it */
     104          76 :         if (strcmp(defel->defname, "proto_version") == 0)
     105             :         {
     106             :             int64       parsed;
     107             : 
     108          38 :             if (protocol_version_given)
     109           0 :                 ereport(ERROR,
     110             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     111             :                          errmsg("conflicting or redundant options")));
     112          38 :             protocol_version_given = true;
     113             : 
     114          38 :             if (!scanint8(strVal(defel->arg), true, &parsed))
     115           0 :                 ereport(ERROR,
     116             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     117             :                          errmsg("invalid proto_version")));
     118             : 
     119          38 :             if (parsed > PG_UINT32_MAX || parsed < 0)
     120           0 :                 ereport(ERROR,
     121             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     122             :                          errmsg("proto_version \"%s\" out of range",
     123             :                                 strVal(defel->arg))));
     124             : 
     125          38 :             *protocol_version = (uint32) parsed;
     126             :         }
     127          38 :         else if (strcmp(defel->defname, "publication_names") == 0)
     128             :         {
     129          38 :             if (publication_names_given)
     130           0 :                 ereport(ERROR,
     131             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     132             :                          errmsg("conflicting or redundant options")));
     133          38 :             publication_names_given = true;
     134             : 
     135          38 :             if (!SplitIdentifierString(strVal(defel->arg), ',',
     136             :                                        publication_names))
     137           0 :                 ereport(ERROR,
     138             :                         (errcode(ERRCODE_INVALID_NAME),
     139             :                          errmsg("invalid publication_names syntax")));
     140             :         }
     141             :         else
     142           0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     143             :     }
     144          38 : }
     145             : 
     146             : /*
     147             :  * Initialize this plugin
     148             :  */
     149             : static void
     150         154 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     151             :                  bool is_init)
     152             : {
     153         154 :     PGOutputData *data = palloc0(sizeof(PGOutputData));
     154             : 
     155             :     /* Create our memory context for private allocations. */
     156         154 :     data->context = AllocSetContextCreate(ctx->context,
     157             :                                           "logical replication output context",
     158             :                                           ALLOCSET_DEFAULT_SIZES);
     159             : 
     160         154 :     ctx->output_plugin_private = data;
     161             : 
     162             :     /* This plugin uses binary protocol. */
     163         154 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     164             : 
     165             :     /*
     166             :      * This is replication start and not slot initialization.
     167             :      *
     168             :      * Parse and validate options passed by the client.
     169             :      */
     170         154 :     if (!is_init)
     171             :     {
     172             :         /* Parse the params and ERROR if we see any we don't recognize */
     173          38 :         parse_output_parameters(ctx->output_plugin_options,
     174             :                                 &data->protocol_version,
     175             :                                 &data->publication_names);
     176             : 
     177             :         /* Check if we support requested protocol */
     178          38 :         if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
     179           0 :             ereport(ERROR,
     180             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     181             :                      errmsg("client sent proto_version=%d but we only support protocol %d or lower",
     182             :                             data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
     183             : 
     184          38 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     185           0 :             ereport(ERROR,
     186             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     187             :                      errmsg("client sent proto_version=%d but we only support protocol %d or higher",
     188             :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     189             : 
     190          38 :         if (list_length(data->publication_names) < 1)
     191           0 :             ereport(ERROR,
     192             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     193             :                      errmsg("publication_names parameter missing")));
     194             : 
     195             :         /* Init publication state. */
     196          38 :         data->publications = NIL;
     197          38 :         publications_valid = false;
     198          38 :         CacheRegisterSyscacheCallback(PUBLICATIONOID,
     199             :                                       publication_invalidation_cb,
     200             :                                       (Datum) 0);
     201             : 
     202             :         /* Initialize relation schema cache. */
     203          38 :         init_rel_sync_cache(CacheMemoryContext);
     204             :     }
     205         154 : }
     206             : 
     207             : /*
     208             :  * BEGIN callback
     209             :  */
     210             : static void
     211         234 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     212             : {
     213         234 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     214             : 
     215         234 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     216         234 :     logicalrep_write_begin(ctx->out, txn);
     217             : 
     218         234 :     if (send_replication_origin)
     219             :     {
     220             :         char       *origin;
     221             : 
     222             :         /* Message boundary */
     223           0 :         OutputPluginWrite(ctx, false);
     224           0 :         OutputPluginPrepareWrite(ctx, true);
     225             : 
     226             :         /*----------
     227             :          * XXX: which behaviour do we want here?
     228             :          *
     229             :          * Alternatives:
     230             :          *  - don't send origin message if origin name not found
     231             :          *    (that's what we do now)
     232             :          *  - throw error - that will break replication, not good
     233             :          *  - send some special "unknown" origin
     234             :          *----------
     235             :          */
     236           0 :         if (replorigin_by_oid(txn->origin_id, true, &origin))
     237           0 :             logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
     238             :     }
     239             : 
     240         234 :     OutputPluginWrite(ctx, true);
     241         234 : }
     242             : 
     243             : /*
     244             :  * COMMIT callback
     245             :  */
     246             : static void
     247         234 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     248             :                     XLogRecPtr commit_lsn)
     249             : {
     250         234 :     OutputPluginUpdateProgress(ctx);
     251             : 
     252         234 :     OutputPluginPrepareWrite(ctx, true);
     253         234 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
     254         234 :     OutputPluginWrite(ctx, true);
     255         234 : }
     256             : 
     257             : /*
     258             :  * Write the relation schema if the current schema hasn't been sent yet.
     259             :  */
     260             : static void
     261        1322 : maybe_send_schema(LogicalDecodingContext *ctx,
     262             :                   Relation relation, RelationSyncEntry *relentry)
     263             : {
     264        1322 :     if (!relentry->schema_sent)
     265             :     {
     266             :         TupleDesc   desc;
     267             :         int         i;
     268             : 
     269          76 :         desc = RelationGetDescr(relation);
     270             : 
     271             :         /*
     272             :          * Write out type info if needed.  We do that only for user-created
     273             :          * types.  We use FirstGenbkiObjectId as the cutoff, so that we only
     274             :          * consider objects with hand-assigned OIDs to be "built in", not for
     275             :          * instance any function or type defined in the information_schema.
     276             :          * This is important because only hand-assigned OIDs can be expected
     277             :          * to remain stable across major versions.
     278             :          */
     279         210 :         for (i = 0; i < desc->natts; i++)
     280             :         {
     281         134 :             Form_pg_attribute att = TupleDescAttr(desc, i);
     282             : 
     283         134 :             if (att->attisdropped || att->attgenerated)
     284           0 :                 continue;
     285             : 
     286         134 :             if (att->atttypid < FirstGenbkiObjectId)
     287         102 :                 continue;
     288             : 
     289          32 :             OutputPluginPrepareWrite(ctx, false);
     290          32 :             logicalrep_write_typ(ctx->out, att->atttypid);
     291          32 :             OutputPluginWrite(ctx, false);
     292             :         }
     293             : 
     294          76 :         OutputPluginPrepareWrite(ctx, false);
     295          76 :         logicalrep_write_rel(ctx->out, relation);
     296          76 :         OutputPluginWrite(ctx, false);
     297          76 :         relentry->schema_sent = true;
     298             :     }
     299        1322 : }
     300             : 
     301             : /*
     302             :  * Sends the decoded DML over wire.
     303             :  */
     304             : static void
     305        3508 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     306             :                 Relation relation, ReorderBufferChange *change)
     307             : {
     308        3508 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     309             :     MemoryContext old;
     310             :     RelationSyncEntry *relentry;
     311             : 
     312        3508 :     if (!is_publishable_relation(relation))
     313           4 :         return;
     314             : 
     315        3504 :     relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
     316             : 
     317             :     /* First check the table filter */
     318        3504 :     switch (change->action)
     319             :     {
     320             :         case REORDER_BUFFER_CHANGE_INSERT:
     321         726 :             if (!relentry->pubactions.pubinsert)
     322           0 :                 return;
     323         726 :             break;
     324             :         case REORDER_BUFFER_CHANGE_UPDATE:
     325         292 :             if (!relentry->pubactions.pubupdate)
     326          80 :                 return;
     327         212 :             break;
     328             :         case REORDER_BUFFER_CHANGE_DELETE:
     329        2486 :             if (!relentry->pubactions.pubdelete)
     330        2104 :                 return;
     331         382 :             break;
     332             :         default:
     333             :             Assert(false);
     334             :     }
     335             : 
     336             :     /* Avoid leaking memory by using and resetting our own context */
     337        1320 :     old = MemoryContextSwitchTo(data->context);
     338             : 
     339        1320 :     maybe_send_schema(ctx, relation, relentry);
     340             : 
     341             :     /* Send the data */
     342        1320 :     switch (change->action)
     343             :     {
     344             :         case REORDER_BUFFER_CHANGE_INSERT:
     345         726 :             OutputPluginPrepareWrite(ctx, true);
     346         726 :             logicalrep_write_insert(ctx->out, relation,
     347         726 :                                     &change->data.tp.newtuple->tuple);
     348         726 :             OutputPluginWrite(ctx, true);
     349         726 :             break;
     350             :         case REORDER_BUFFER_CHANGE_UPDATE:
     351             :             {
     352         424 :                 HeapTuple   oldtuple = change->data.tp.oldtuple ?
     353         212 :                 &change->data.tp.oldtuple->tuple : NULL;
     354             : 
     355         212 :                 OutputPluginPrepareWrite(ctx, true);
     356         212 :                 logicalrep_write_update(ctx->out, relation, oldtuple,
     357         212 :                                         &change->data.tp.newtuple->tuple);
     358         212 :                 OutputPluginWrite(ctx, true);
     359         212 :                 break;
     360             :             }
     361             :         case REORDER_BUFFER_CHANGE_DELETE:
     362         382 :             if (change->data.tp.oldtuple)
     363             :             {
     364         382 :                 OutputPluginPrepareWrite(ctx, true);
     365         382 :                 logicalrep_write_delete(ctx->out, relation,
     366         382 :                                         &change->data.tp.oldtuple->tuple);
     367         382 :                 OutputPluginWrite(ctx, true);
     368             :             }
     369             :             else
     370           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
     371         382 :             break;
     372             :         default:
     373             :             Assert(false);
     374             :     }
     375             : 
     376             :     /* Cleanup */
     377        1320 :     MemoryContextSwitchTo(old);
     378        1320 :     MemoryContextReset(data->context);
     379             : }
     380             : 
     381             : static void
     382          10 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     383             :                   int nrelations, Relation relations[], ReorderBufferChange *change)
     384             : {
     385          10 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     386             :     MemoryContext old;
     387             :     RelationSyncEntry *relentry;
     388             :     int         i;
     389             :     int         nrelids;
     390             :     Oid        *relids;
     391             : 
     392          10 :     old = MemoryContextSwitchTo(data->context);
     393             : 
     394          10 :     relids = palloc0(nrelations * sizeof(Oid));
     395          10 :     nrelids = 0;
     396             : 
     397          22 :     for (i = 0; i < nrelations; i++)
     398             :     {
     399          12 :         Relation    relation = relations[i];
     400          12 :         Oid         relid = RelationGetRelid(relation);
     401             : 
     402          12 :         if (!is_publishable_relation(relation))
     403           0 :             continue;
     404             : 
     405          12 :         relentry = get_rel_sync_entry(data, relid);
     406             : 
     407          12 :         if (!relentry->pubactions.pubtruncate)
     408          10 :             continue;
     409             : 
     410           2 :         relids[nrelids++] = relid;
     411           2 :         maybe_send_schema(ctx, relation, relentry);
     412             :     }
     413             : 
     414          10 :     if (nrelids > 0)
     415             :     {
     416           2 :         OutputPluginPrepareWrite(ctx, true);
     417           4 :         logicalrep_write_truncate(ctx->out,
     418             :                                   nrelids,
     419             :                                   relids,
     420           2 :                                   change->data.truncate.cascade,
     421           2 :                                   change->data.truncate.restart_seqs);
     422           2 :         OutputPluginWrite(ctx, true);
     423             :     }
     424             : 
     425          10 :     MemoryContextSwitchTo(old);
     426          10 :     MemoryContextReset(data->context);
     427          10 : }
     428             : 
     429             : /*
     430             :  * Currently we always forward.
     431             :  */
     432             : static bool
     433       15210 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
     434             :                        RepOriginId origin_id)
     435             : {
     436       15210 :     return false;
     437             : }
     438             : 
     439             : /*
     440             :  * Shutdown the output plugin.
     441             :  *
     442             :  * Note, we don't need to clean the data->context as it's child context
     443             :  * of the ctx->context so it will be cleaned up by logical decoding machinery.
     444             :  */
     445             : static void
     446         116 : pgoutput_shutdown(LogicalDecodingContext *ctx)
     447             : {
     448         116 :     if (RelationSyncCache)
     449             :     {
     450           0 :         hash_destroy(RelationSyncCache);
     451           0 :         RelationSyncCache = NULL;
     452             :     }
     453         116 : }
     454             : 
     455             : /*
     456             :  * Load publications from the list of publication names.
     457             :  */
     458             : static List *
     459          30 : LoadPublications(List *pubnames)
     460             : {
     461          30 :     List       *result = NIL;
     462             :     ListCell   *lc;
     463             : 
     464          62 :     foreach(lc, pubnames)
     465             :     {
     466          32 :         char       *pubname = (char *) lfirst(lc);
     467          32 :         Publication *pub = GetPublicationByName(pubname, false);
     468             : 
     469          32 :         result = lappend(result, pub);
     470             :     }
     471             : 
     472          30 :     return result;
     473             : }
     474             : 
     475             : /*
     476             :  * Publication cache invalidation callback.
     477             :  */
     478             : static void
     479          14 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
     480             : {
     481          14 :     publications_valid = false;
     482             : 
     483             :     /*
     484             :      * Also invalidate per-relation cache so that next time the filtering info
     485             :      * is checked it will be updated with the new publication settings.
     486             :      */
     487          14 :     rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
     488          14 : }
     489             : 
     490             : /*
     491             :  * Initialize the relation schema sync cache for a decoding session.
     492             :  *
     493             :  * The hash table is destroyed at the end of a decoding session. While
     494             :  * relcache invalidations still exist and will still be invoked, they
     495             :  * will just see the null hash table global and take no action.
     496             :  */
     497             : static void
     498          38 : init_rel_sync_cache(MemoryContext cachectx)
     499             : {
     500             :     HASHCTL     ctl;
     501             :     MemoryContext old_ctxt;
     502             : 
     503          38 :     if (RelationSyncCache != NULL)
     504           0 :         return;
     505             : 
     506             :     /* Make a new hash table for the cache */
     507          38 :     MemSet(&ctl, 0, sizeof(ctl));
     508          38 :     ctl.keysize = sizeof(Oid);
     509          38 :     ctl.entrysize = sizeof(RelationSyncEntry);
     510          38 :     ctl.hcxt = cachectx;
     511             : 
     512          38 :     old_ctxt = MemoryContextSwitchTo(cachectx);
     513          38 :     RelationSyncCache = hash_create("logical replication output relation cache",
     514             :                                     128, &ctl,
     515             :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
     516          38 :     (void) MemoryContextSwitchTo(old_ctxt);
     517             : 
     518             :     Assert(RelationSyncCache != NULL);
     519             : 
     520          38 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
     521          38 :     CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
     522             :                                   rel_sync_cache_publication_cb,
     523             :                                   (Datum) 0);
     524             : }
     525             : 
     526             : /*
     527             :  * Find or create entry in the relation schema cache.
     528             :  */
     529             : static RelationSyncEntry *
     530        3516 : get_rel_sync_entry(PGOutputData *data, Oid relid)
     531             : {
     532             :     RelationSyncEntry *entry;
     533             :     bool        found;
     534             :     MemoryContext oldctx;
     535             : 
     536             :     Assert(RelationSyncCache != NULL);
     537             : 
     538             :     /* Find cached function info, creating if not found */
     539        3516 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     540        3516 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
     541             :                                               (void *) &relid,
     542             :                                               HASH_ENTER, &found);
     543        3516 :     MemoryContextSwitchTo(oldctx);
     544             :     Assert(entry != NULL);
     545             : 
     546             :     /* Not found means schema wasn't sent */
     547        3516 :     if (!found || !entry->replicate_valid)
     548             :     {
     549          80 :         List       *pubids = GetRelationPublications(relid);
     550             :         ListCell   *lc;
     551             : 
     552             :         /* Reload publications if needed before use. */
     553          80 :         if (!publications_valid)
     554             :         {
     555          30 :             oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     556          30 :             if (data->publications)
     557           4 :                 list_free_deep(data->publications);
     558             : 
     559          30 :             data->publications = LoadPublications(data->publication_names);
     560          30 :             MemoryContextSwitchTo(oldctx);
     561          30 :             publications_valid = true;
     562             :         }
     563             : 
     564             :         /*
     565             :          * Build publication cache. We can't use one provided by relcache as
     566             :          * relcache considers all publications given relation is in, but here
     567             :          * we only need to consider ones that the subscriber requested.
     568             :          */
     569          80 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
     570          80 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
     571             : 
     572         104 :         foreach(lc, data->publications)
     573             :         {
     574          82 :             Publication *pub = lfirst(lc);
     575             : 
     576          82 :             if (pub->alltables || list_member_oid(pubids, pub->oid))
     577             :             {
     578          70 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
     579          70 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
     580          70 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
     581          70 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
     582             :             }
     583             : 
     584         140 :             if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
     585         116 :                 entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
     586          58 :                 break;
     587             :         }
     588             : 
     589          80 :         list_free(pubids);
     590             : 
     591          80 :         entry->replicate_valid = true;
     592             :     }
     593             : 
     594        3516 :     if (!found)
     595          78 :         entry->schema_sent = false;
     596             : 
     597        3516 :     return entry;
     598             : }
     599             : 
     600             : /*
     601             :  * Relcache invalidation callback
     602             :  */
     603             : static void
     604         800 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
     605             : {
     606             :     RelationSyncEntry *entry;
     607             : 
     608             :     /*
     609             :      * We can get here if the plugin was used in SQL interface as the
     610             :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
     611             :      * is no way to unregister the relcache invalidation callback.
     612             :      */
     613         800 :     if (RelationSyncCache == NULL)
     614           0 :         return;
     615             : 
     616             :     /*
     617             :      * Nobody keeps pointers to entries in this hash table around outside
     618             :      * logical decoding callback calls - but invalidation events can come in
     619             :      * *during* a callback if we access the relcache in the callback. Because
     620             :      * of that we must mark the cache entry as invalid but not remove it from
     621             :      * the hash while it could still be referenced, then prune it at a later
     622             :      * safe point.
     623             :      *
     624             :      * Getting invalidations for relations that aren't in the table is
     625             :      * entirely normal, since there's no way to unregister for an invalidation
     626             :      * event. So we don't care if it's found or not.
     627             :      */
     628         800 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
     629             :                                               HASH_FIND, NULL);
     630             : 
     631             :     /*
     632             :      * Reset schema sent status as the relation definition may have changed.
     633             :      */
     634         800 :     if (entry != NULL)
     635         252 :         entry->schema_sent = false;
     636             : }
     637             : 
     638             : /*
     639             :  * Publication relation map syscache invalidation callback
     640             :  */
     641             : static void
     642          22 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
     643             : {
     644             :     HASH_SEQ_STATUS status;
     645             :     RelationSyncEntry *entry;
     646             : 
     647             :     /*
     648             :      * We can get here if the plugin was used in SQL interface as the
     649             :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
     650             :      * is no way to unregister the relcache invalidation callback.
     651             :      */
     652          22 :     if (RelationSyncCache == NULL)
     653           0 :         return;
     654             : 
     655             :     /*
     656             :      * There is no way to find which entry in our cache the hash belongs to so
     657             :      * mark the whole cache as invalid.
     658             :      */
     659          22 :     hash_seq_init(&status, RelationSyncCache);
     660          68 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
     661          24 :         entry->replicate_valid = false;
     662             : }

Generated by: LCOV version 1.13