Line data Source code
1 : /*------------------------------------------------------------------------- 2 : * 3 : * message.c 4 : * Generic logical messages. 5 : * 6 : * Copyright (c) 2013-2025, PostgreSQL Global Development Group 7 : * 8 : * IDENTIFICATION 9 : * src/backend/replication/logical/message.c 10 : * 11 : * NOTES 12 : * 13 : * Generic logical messages allow XLOG logging of arbitrary binary blobs that 14 : * get passed to the logical decoding plugin. In normal XLOG processing they 15 : * are same as NOOP. 16 : * 17 : * These messages can be either transactional or non-transactional. 18 : * Transactional messages are part of current transaction and will be sent to 19 : * decoding plugin using in a same way as DML operations. 20 : * Non-transactional messages are sent to the plugin at the time when the 21 : * logical decoding reads them from XLOG. This also means that transactional 22 : * messages won't be delivered if the transaction was rolled back but the 23 : * non-transactional one will always be delivered. 24 : * 25 : * Every message carries prefix to avoid conflicts between different decoding 26 : * plugins. The plugin authors must take extra care to use unique prefix, 27 : * good options seems to be for example to use the name of the extension. 28 : * 29 : * --------------------------------------------------------------------------- 30 : */ 31 : 32 : #include "postgres.h" 33 : 34 : #include "access/xact.h" 35 : #include "access/xloginsert.h" 36 : #include "miscadmin.h" 37 : #include "replication/message.h" 38 : 39 : /* 40 : * Write logical decoding message into XLog. 41 : */ 42 : XLogRecPtr 43 228 : LogLogicalMessage(const char *prefix, const char *message, size_t size, 44 : bool transactional, bool flush) 45 : { 46 : xl_logical_message xlrec; 47 : XLogRecPtr lsn; 48 : 49 : /* 50 : * Force xid to be allocated if we're emitting a transactional message. 51 : */ 52 228 : if (transactional) 53 : { 54 : Assert(IsTransactionState()); 55 140 : GetCurrentTransactionId(); 56 : } 57 : 58 228 : xlrec.dbId = MyDatabaseId; 59 228 : xlrec.transactional = transactional; 60 : /* trailing zero is critical; see logicalmsg_desc */ 61 228 : xlrec.prefix_size = strlen(prefix) + 1; 62 228 : xlrec.message_size = size; 63 : 64 228 : XLogBeginInsert(); 65 228 : XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage); 66 228 : XLogRegisterData(prefix, xlrec.prefix_size); 67 228 : XLogRegisterData(message, size); 68 : 69 : /* allow origin filtering */ 70 228 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); 71 : 72 228 : lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); 73 : 74 : /* 75 : * Make sure that the message hits disk before leaving if emitting a 76 : * non-transactional message when flush is requested. 77 : */ 78 228 : if (!transactional && flush) 79 2 : XLogFlush(lsn); 80 228 : return lsn; 81 : } 82 : 83 : /* 84 : * Redo is basically just noop for logical decoding messages. 85 : */ 86 : void 87 176 : logicalmsg_redo(XLogReaderState *record) 88 : { 89 176 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; 90 : 91 176 : if (info != XLOG_LOGICAL_MESSAGE) 92 0 : elog(PANIC, "logicalmsg_redo: unknown op code %u", info); 93 : 94 : /* This is only interesting for logical decoding, see decode.c. */ 95 176 : }