LCOV - code coverage report
Current view: top level - src/backend/replication/logical - message.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 20 21 95.2 %
Date: 2025-01-18 04:15:08 Functions: 2 2 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * message.c
       4             :  *    Generic logical messages.
       5             :  *
       6             :  * Copyright (c) 2013-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/message.c
      10             :  *
      11             :  * NOTES
      12             :  *
      13             :  * Generic logical messages allow XLOG logging of arbitrary binary blobs that
      14             :  * get passed to the logical decoding plugin. In normal XLOG processing they
      15             :  * are same as NOOP.
      16             :  *
      17             :  * These messages can be either transactional or non-transactional.
      18             :  * Transactional messages are part of current transaction and will be sent to
      19             :  * decoding plugin using in a same way as DML operations.
      20             :  * Non-transactional messages are sent to the plugin at the time when the
      21             :  * logical decoding reads them from XLOG. This also means that transactional
      22             :  * messages won't be delivered if the transaction was rolled back but the
      23             :  * non-transactional one will always be delivered.
      24             :  *
      25             :  * Every message carries prefix to avoid conflicts between different decoding
      26             :  * plugins. The plugin authors must take extra care to use unique prefix,
      27             :  * good options seems to be for example to use the name of the extension.
      28             :  *
      29             :  * ---------------------------------------------------------------------------
      30             :  */
      31             : 
      32             : #include "postgres.h"
      33             : 
      34             : #include "access/xact.h"
      35             : #include "access/xloginsert.h"
      36             : #include "miscadmin.h"
      37             : #include "replication/message.h"
      38             : 
      39             : /*
      40             :  * Write logical decoding message into XLog.
      41             :  */
      42             : XLogRecPtr
      43         228 : LogLogicalMessage(const char *prefix, const char *message, size_t size,
      44             :                   bool transactional, bool flush)
      45             : {
      46             :     xl_logical_message xlrec;
      47             :     XLogRecPtr  lsn;
      48             : 
      49             :     /*
      50             :      * Force xid to be allocated if we're emitting a transactional message.
      51             :      */
      52         228 :     if (transactional)
      53             :     {
      54             :         Assert(IsTransactionState());
      55         140 :         GetCurrentTransactionId();
      56             :     }
      57             : 
      58         228 :     xlrec.dbId = MyDatabaseId;
      59         228 :     xlrec.transactional = transactional;
      60             :     /* trailing zero is critical; see logicalmsg_desc */
      61         228 :     xlrec.prefix_size = strlen(prefix) + 1;
      62         228 :     xlrec.message_size = size;
      63             : 
      64         228 :     XLogBeginInsert();
      65         228 :     XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
      66         228 :     XLogRegisterData(prefix, xlrec.prefix_size);
      67         228 :     XLogRegisterData(message, size);
      68             : 
      69             :     /* allow origin filtering */
      70         228 :     XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
      71             : 
      72         228 :     lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
      73             : 
      74             :     /*
      75             :      * Make sure that the message hits disk before leaving if emitting a
      76             :      * non-transactional message when flush is requested.
      77             :      */
      78         228 :     if (!transactional && flush)
      79           2 :         XLogFlush(lsn);
      80         228 :     return lsn;
      81             : }
      82             : 
      83             : /*
      84             :  * Redo is basically just noop for logical decoding messages.
      85             :  */
      86             : void
      87         176 : logicalmsg_redo(XLogReaderState *record)
      88             : {
      89         176 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
      90             : 
      91         176 :     if (info != XLOG_LOGICAL_MESSAGE)
      92           0 :         elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
      93             : 
      94             :     /* This is only interesting for logical decoding, see decode.c. */
      95         176 : }

Generated by: LCOV version 1.14