LCOV - code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 447 493 90.7 %
Date: 2024-12-12 19:15:15 Functions: 45 46 97.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * proto.c
       4             :  *      logical replication protocol functions
       5             :  *
       6             :  * Copyright (c) 2015-2024, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *      src/backend/replication/logical/proto.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/sysattr.h"
      16             : #include "catalog/pg_namespace.h"
      17             : #include "catalog/pg_type.h"
      18             : #include "libpq/pqformat.h"
      19             : #include "replication/logicalproto.h"
      20             : #include "utils/lsyscache.h"
      21             : #include "utils/syscache.h"
      22             : 
      23             : /*
      24             :  * Protocol message flags.
      25             :  */
      26             : #define LOGICALREP_IS_REPLICA_IDENTITY 1
      27             : 
      28             : #define MESSAGE_TRANSACTIONAL (1<<0)
      29             : #define TRUNCATE_CASCADE        (1<<0)
      30             : #define TRUNCATE_RESTART_SEQS   (1<<1)
      31             : 
      32             : static void logicalrep_write_attrs(StringInfo out, Relation rel,
      33             :                                    Bitmapset *columns, bool include_gencols);
      34             : static void logicalrep_write_tuple(StringInfo out, Relation rel,
      35             :                                    TupleTableSlot *slot,
      36             :                                    bool binary, Bitmapset *columns,
      37             :                                    bool include_gencols);
      38             : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
      39             : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
      40             : 
      41             : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
      42             : static const char *logicalrep_read_namespace(StringInfo in);
      43             : 
      44             : /*
      45             :  * Write BEGIN to the output stream.
      46             :  */
      47             : void
      48         820 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
      49             : {
      50         820 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
      51             : 
      52             :     /* fixed fields */
      53         820 :     pq_sendint64(out, txn->final_lsn);
      54         820 :     pq_sendint64(out, txn->xact_time.commit_time);
      55         820 :     pq_sendint32(out, txn->xid);
      56         820 : }
      57             : 
      58             : /*
      59             :  * Read transaction BEGIN from the stream.
      60             :  */
      61             : void
      62         886 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
      63             : {
      64             :     /* read fields */
      65         886 :     begin_data->final_lsn = pq_getmsgint64(in);
      66         886 :     if (begin_data->final_lsn == InvalidXLogRecPtr)
      67           0 :         elog(ERROR, "final_lsn not set in begin message");
      68         886 :     begin_data->committime = pq_getmsgint64(in);
      69         886 :     begin_data->xid = pq_getmsgint(in, 4);
      70         886 : }
      71             : 
      72             : 
      73             : /*
      74             :  * Write COMMIT to the output stream.
      75             :  */
      76             : void
      77         818 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
      78             :                         XLogRecPtr commit_lsn)
      79             : {
      80         818 :     uint8       flags = 0;
      81             : 
      82         818 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
      83             : 
      84             :     /* send the flags field (unused for now) */
      85         818 :     pq_sendbyte(out, flags);
      86             : 
      87             :     /* send fields */
      88         818 :     pq_sendint64(out, commit_lsn);
      89         818 :     pq_sendint64(out, txn->end_lsn);
      90         818 :     pq_sendint64(out, txn->xact_time.commit_time);
      91         818 : }
      92             : 
      93             : /*
      94             :  * Read transaction COMMIT from the stream.
      95             :  */
      96             : void
      97         832 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
      98             : {
      99             :     /* read flags (unused for now) */
     100         832 :     uint8       flags = pq_getmsgbyte(in);
     101             : 
     102         832 :     if (flags != 0)
     103           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
     104             : 
     105             :     /* read fields */
     106         832 :     commit_data->commit_lsn = pq_getmsgint64(in);
     107         832 :     commit_data->end_lsn = pq_getmsgint64(in);
     108         832 :     commit_data->committime = pq_getmsgint64(in);
     109         832 : }
     110             : 
     111             : /*
     112             :  * Write BEGIN PREPARE to the output stream.
     113             :  */
     114             : void
     115          40 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
     116             : {
     117          40 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
     118             : 
     119             :     /* fixed fields */
     120          40 :     pq_sendint64(out, txn->final_lsn);
     121          40 :     pq_sendint64(out, txn->end_lsn);
     122          40 :     pq_sendint64(out, txn->xact_time.prepare_time);
     123          40 :     pq_sendint32(out, txn->xid);
     124             : 
     125             :     /* send gid */
     126          40 :     pq_sendstring(out, txn->gid);
     127          40 : }
     128             : 
     129             : /*
     130             :  * Read transaction BEGIN PREPARE from the stream.
     131             :  */
     132             : void
     133          32 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
     134             : {
     135             :     /* read fields */
     136          32 :     begin_data->prepare_lsn = pq_getmsgint64(in);
     137          32 :     if (begin_data->prepare_lsn == InvalidXLogRecPtr)
     138           0 :         elog(ERROR, "prepare_lsn not set in begin prepare message");
     139          32 :     begin_data->end_lsn = pq_getmsgint64(in);
     140          32 :     if (begin_data->end_lsn == InvalidXLogRecPtr)
     141           0 :         elog(ERROR, "end_lsn not set in begin prepare message");
     142          32 :     begin_data->prepare_time = pq_getmsgint64(in);
     143          32 :     begin_data->xid = pq_getmsgint(in, 4);
     144             : 
     145             :     /* read gid (copy it into a pre-allocated buffer) */
     146          32 :     strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
     147          32 : }
     148             : 
     149             : /*
     150             :  * The core functionality for logicalrep_write_prepare and
     151             :  * logicalrep_write_stream_prepare.
     152             :  */
     153             : static void
     154          68 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
     155             :                                 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
     156             : {
     157          68 :     uint8       flags = 0;
     158             : 
     159          68 :     pq_sendbyte(out, type);
     160             : 
     161             :     /*
     162             :      * This should only ever happen for two-phase commit transactions, in
     163             :      * which case we expect to have a valid GID.
     164             :      */
     165             :     Assert(txn->gid != NULL);
     166             :     Assert(rbtxn_prepared(txn));
     167             :     Assert(TransactionIdIsValid(txn->xid));
     168             : 
     169             :     /* send the flags field */
     170          68 :     pq_sendbyte(out, flags);
     171             : 
     172             :     /* send fields */
     173          68 :     pq_sendint64(out, prepare_lsn);
     174          68 :     pq_sendint64(out, txn->end_lsn);
     175          68 :     pq_sendint64(out, txn->xact_time.prepare_time);
     176          68 :     pq_sendint32(out, txn->xid);
     177             : 
     178             :     /* send gid */
     179          68 :     pq_sendstring(out, txn->gid);
     180          68 : }
     181             : 
     182             : /*
     183             :  * Write PREPARE to the output stream.
     184             :  */
     185             : void
     186          40 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
     187             :                          XLogRecPtr prepare_lsn)
     188             : {
     189          40 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
     190             :                                     txn, prepare_lsn);
     191          40 : }
     192             : 
     193             : /*
     194             :  * The core functionality for logicalrep_read_prepare and
     195             :  * logicalrep_read_stream_prepare.
     196             :  */
     197             : static void
     198          52 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
     199             :                                LogicalRepPreparedTxnData *prepare_data)
     200             : {
     201             :     /* read flags */
     202          52 :     uint8       flags = pq_getmsgbyte(in);
     203             : 
     204          52 :     if (flags != 0)
     205           0 :         elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
     206             : 
     207             :     /* read fields */
     208          52 :     prepare_data->prepare_lsn = pq_getmsgint64(in);
     209          52 :     if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
     210           0 :         elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
     211          52 :     prepare_data->end_lsn = pq_getmsgint64(in);
     212          52 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
     213           0 :         elog(ERROR, "end_lsn is not set in %s message", msgtype);
     214          52 :     prepare_data->prepare_time = pq_getmsgint64(in);
     215          52 :     prepare_data->xid = pq_getmsgint(in, 4);
     216          52 :     if (prepare_data->xid == InvalidTransactionId)
     217           0 :         elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
     218             : 
     219             :     /* read gid (copy it into a pre-allocated buffer) */
     220          52 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     221          52 : }
     222             : 
     223             : /*
     224             :  * Read transaction PREPARE from the stream.
     225             :  */
     226             : void
     227          30 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     228             : {
     229          30 :     logicalrep_read_prepare_common(in, "prepare", prepare_data);
     230          30 : }
     231             : 
     232             : /*
     233             :  * Write COMMIT PREPARED to the output stream.
     234             :  */
     235             : void
     236          48 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
     237             :                                  XLogRecPtr commit_lsn)
     238             : {
     239          48 :     uint8       flags = 0;
     240             : 
     241          48 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
     242             : 
     243             :     /*
     244             :      * This should only ever happen for two-phase commit transactions, in
     245             :      * which case we expect to have a valid GID.
     246             :      */
     247             :     Assert(txn->gid != NULL);
     248             : 
     249             :     /* send the flags field */
     250          48 :     pq_sendbyte(out, flags);
     251             : 
     252             :     /* send fields */
     253          48 :     pq_sendint64(out, commit_lsn);
     254          48 :     pq_sendint64(out, txn->end_lsn);
     255          48 :     pq_sendint64(out, txn->xact_time.commit_time);
     256          48 :     pq_sendint32(out, txn->xid);
     257             : 
     258             :     /* send gid */
     259          48 :     pq_sendstring(out, txn->gid);
     260          48 : }
     261             : 
     262             : /*
     263             :  * Read transaction COMMIT PREPARED from the stream.
     264             :  */
     265             : void
     266          40 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
     267             : {
     268             :     /* read flags */
     269          40 :     uint8       flags = pq_getmsgbyte(in);
     270             : 
     271          40 :     if (flags != 0)
     272           0 :         elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
     273             : 
     274             :     /* read fields */
     275          40 :     prepare_data->commit_lsn = pq_getmsgint64(in);
     276          40 :     if (prepare_data->commit_lsn == InvalidXLogRecPtr)
     277           0 :         elog(ERROR, "commit_lsn is not set in commit prepared message");
     278          40 :     prepare_data->end_lsn = pq_getmsgint64(in);
     279          40 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
     280           0 :         elog(ERROR, "end_lsn is not set in commit prepared message");
     281          40 :     prepare_data->commit_time = pq_getmsgint64(in);
     282          40 :     prepare_data->xid = pq_getmsgint(in, 4);
     283             : 
     284             :     /* read gid (copy it into a pre-allocated buffer) */
     285          40 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     286          40 : }
     287             : 
     288             : /*
     289             :  * Write ROLLBACK PREPARED to the output stream.
     290             :  */
     291             : void
     292          18 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
     293             :                                    XLogRecPtr prepare_end_lsn,
     294             :                                    TimestampTz prepare_time)
     295             : {
     296          18 :     uint8       flags = 0;
     297             : 
     298          18 :     pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
     299             : 
     300             :     /*
     301             :      * This should only ever happen for two-phase commit transactions, in
     302             :      * which case we expect to have a valid GID.
     303             :      */
     304             :     Assert(txn->gid != NULL);
     305             : 
     306             :     /* send the flags field */
     307          18 :     pq_sendbyte(out, flags);
     308             : 
     309             :     /* send fields */
     310          18 :     pq_sendint64(out, prepare_end_lsn);
     311          18 :     pq_sendint64(out, txn->end_lsn);
     312          18 :     pq_sendint64(out, prepare_time);
     313          18 :     pq_sendint64(out, txn->xact_time.commit_time);
     314          18 :     pq_sendint32(out, txn->xid);
     315             : 
     316             :     /* send gid */
     317          18 :     pq_sendstring(out, txn->gid);
     318          18 : }
     319             : 
     320             : /*
     321             :  * Read transaction ROLLBACK PREPARED from the stream.
     322             :  */
     323             : void
     324          10 : logicalrep_read_rollback_prepared(StringInfo in,
     325             :                                   LogicalRepRollbackPreparedTxnData *rollback_data)
     326             : {
     327             :     /* read flags */
     328          10 :     uint8       flags = pq_getmsgbyte(in);
     329             : 
     330          10 :     if (flags != 0)
     331           0 :         elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
     332             : 
     333             :     /* read fields */
     334          10 :     rollback_data->prepare_end_lsn = pq_getmsgint64(in);
     335          10 :     if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
     336           0 :         elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
     337          10 :     rollback_data->rollback_end_lsn = pq_getmsgint64(in);
     338          10 :     if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
     339           0 :         elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
     340          10 :     rollback_data->prepare_time = pq_getmsgint64(in);
     341          10 :     rollback_data->rollback_time = pq_getmsgint64(in);
     342          10 :     rollback_data->xid = pq_getmsgint(in, 4);
     343             : 
     344             :     /* read gid (copy it into a pre-allocated buffer) */
     345          10 :     strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
     346          10 : }
     347             : 
     348             : /*
     349             :  * Write STREAM PREPARE to the output stream.
     350             :  */
     351             : void
     352          28 : logicalrep_write_stream_prepare(StringInfo out,
     353             :                                 ReorderBufferTXN *txn,
     354             :                                 XLogRecPtr prepare_lsn)
     355             : {
     356          28 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
     357             :                                     txn, prepare_lsn);
     358          28 : }
     359             : 
     360             : /*
     361             :  * Read STREAM PREPARE from the stream.
     362             :  */
     363             : void
     364          22 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     365             : {
     366          22 :     logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
     367          22 : }
     368             : 
     369             : /*
     370             :  * Write ORIGIN to the output stream.
     371             :  */
     372             : void
     373          18 : logicalrep_write_origin(StringInfo out, const char *origin,
     374             :                         XLogRecPtr origin_lsn)
     375             : {
     376          18 :     pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
     377             : 
     378             :     /* fixed fields */
     379          18 :     pq_sendint64(out, origin_lsn);
     380             : 
     381             :     /* origin string */
     382          18 :     pq_sendstring(out, origin);
     383          18 : }
     384             : 
     385             : /*
     386             :  * Read ORIGIN from the output stream.
     387             :  */
     388             : char *
     389           0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
     390             : {
     391             :     /* fixed fields */
     392           0 :     *origin_lsn = pq_getmsgint64(in);
     393             : 
     394             :     /* return origin */
     395           0 :     return pstrdup(pq_getmsgstring(in));
     396             : }
     397             : 
     398             : /*
     399             :  * Write INSERT to the output stream.
     400             :  */
     401             : void
     402      211774 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
     403             :                         TupleTableSlot *newslot, bool binary,
     404             :                         Bitmapset *columns, bool include_gencols)
     405             : {
     406      211774 :     pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
     407             : 
     408             :     /* transaction ID (if not valid, we're not streaming) */
     409      211774 :     if (TransactionIdIsValid(xid))
     410      200168 :         pq_sendint32(out, xid);
     411             : 
     412             :     /* use Oid as relation identifier */
     413      211774 :     pq_sendint32(out, RelationGetRelid(rel));
     414             : 
     415      211774 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     416      211774 :     logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
     417      211774 : }
     418             : 
     419             : /*
     420             :  * Read INSERT from stream.
     421             :  *
     422             :  * Fills the new tuple.
     423             :  */
     424             : LogicalRepRelId
     425      153244 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
     426             : {
     427             :     char        action;
     428             :     LogicalRepRelId relid;
     429             : 
     430             :     /* read the relation id */
     431      153244 :     relid = pq_getmsgint(in, 4);
     432             : 
     433      153244 :     action = pq_getmsgbyte(in);
     434      153244 :     if (action != 'N')
     435           0 :         elog(ERROR, "expected new tuple but got %d",
     436             :              action);
     437             : 
     438      153244 :     logicalrep_read_tuple(in, newtup);
     439             : 
     440      153244 :     return relid;
     441             : }
     442             : 
     443             : /*
     444             :  * Write UPDATE to the output stream.
     445             :  */
     446             : void
     447       68878 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
     448             :                         TupleTableSlot *oldslot, TupleTableSlot *newslot,
     449             :                         bool binary, Bitmapset *columns, bool include_gencols)
     450             : {
     451       68878 :     pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
     452             : 
     453             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     454             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     455             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     456             : 
     457             :     /* transaction ID (if not valid, we're not streaming) */
     458       68878 :     if (TransactionIdIsValid(xid))
     459       68464 :         pq_sendint32(out, xid);
     460             : 
     461             :     /* use Oid as relation identifier */
     462       68878 :     pq_sendint32(out, RelationGetRelid(rel));
     463             : 
     464       68878 :     if (oldslot != NULL)
     465             :     {
     466         254 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     467         122 :             pq_sendbyte(out, 'O');  /* old tuple follows */
     468             :         else
     469         132 :             pq_sendbyte(out, 'K');  /* old key follows */
     470         254 :         logicalrep_write_tuple(out, rel, oldslot, binary, columns,
     471             :                                include_gencols);
     472             :     }
     473             : 
     474       68878 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     475       68878 :     logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
     476       68878 : }
     477             : 
     478             : /*
     479             :  * Read UPDATE from stream.
     480             :  */
     481             : LogicalRepRelId
     482       63866 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
     483             :                        LogicalRepTupleData *oldtup,
     484             :                        LogicalRepTupleData *newtup)
     485             : {
     486             :     char        action;
     487             :     LogicalRepRelId relid;
     488             : 
     489             :     /* read the relation id */
     490       63866 :     relid = pq_getmsgint(in, 4);
     491             : 
     492             :     /* read and verify action */
     493       63866 :     action = pq_getmsgbyte(in);
     494       63866 :     if (action != 'K' && action != 'O' && action != 'N')
     495           0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
     496             :              action);
     497             : 
     498             :     /* check for old tuple */
     499       63866 :     if (action == 'K' || action == 'O')
     500             :     {
     501         258 :         logicalrep_read_tuple(in, oldtup);
     502         258 :         *has_oldtuple = true;
     503             : 
     504         258 :         action = pq_getmsgbyte(in);
     505             :     }
     506             :     else
     507       63608 :         *has_oldtuple = false;
     508             : 
     509             :     /* check for new  tuple */
     510       63866 :     if (action != 'N')
     511           0 :         elog(ERROR, "expected action 'N', got %c",
     512             :              action);
     513             : 
     514       63866 :     logicalrep_read_tuple(in, newtup);
     515             : 
     516       63866 :     return relid;
     517             : }
     518             : 
     519             : /*
     520             :  * Write DELETE to the output stream.
     521             :  */
     522             : void
     523       83766 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
     524             :                         TupleTableSlot *oldslot, bool binary,
     525             :                         Bitmapset *columns, bool include_gencols)
     526             : {
     527             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     528             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     529             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     530             : 
     531       83766 :     pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
     532             : 
     533             :     /* transaction ID (if not valid, we're not streaming) */
     534       83766 :     if (TransactionIdIsValid(xid))
     535       83250 :         pq_sendint32(out, xid);
     536             : 
     537             :     /* use Oid as relation identifier */
     538       83766 :     pq_sendint32(out, RelationGetRelid(rel));
     539             : 
     540       83766 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     541         254 :         pq_sendbyte(out, 'O');  /* old tuple follows */
     542             :     else
     543       83512 :         pq_sendbyte(out, 'K');  /* old key follows */
     544             : 
     545       83766 :     logicalrep_write_tuple(out, rel, oldslot, binary, columns, include_gencols);
     546       83766 : }
     547             : 
     548             : /*
     549             :  * Read DELETE from stream.
     550             :  *
     551             :  * Fills the old tuple.
     552             :  */
     553             : LogicalRepRelId
     554       80636 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
     555             : {
     556             :     char        action;
     557             :     LogicalRepRelId relid;
     558             : 
     559             :     /* read the relation id */
     560       80636 :     relid = pq_getmsgint(in, 4);
     561             : 
     562             :     /* read and verify action */
     563       80636 :     action = pq_getmsgbyte(in);
     564       80636 :     if (action != 'K' && action != 'O')
     565           0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
     566             : 
     567       80636 :     logicalrep_read_tuple(in, oldtup);
     568             : 
     569       80636 :     return relid;
     570             : }
     571             : 
     572             : /*
     573             :  * Write TRUNCATE to the output stream.
     574             :  */
     575             : void
     576          20 : logicalrep_write_truncate(StringInfo out,
     577             :                           TransactionId xid,
     578             :                           int nrelids,
     579             :                           Oid relids[],
     580             :                           bool cascade, bool restart_seqs)
     581             : {
     582             :     int         i;
     583          20 :     uint8       flags = 0;
     584             : 
     585          20 :     pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
     586             : 
     587             :     /* transaction ID (if not valid, we're not streaming) */
     588          20 :     if (TransactionIdIsValid(xid))
     589           0 :         pq_sendint32(out, xid);
     590             : 
     591          20 :     pq_sendint32(out, nrelids);
     592             : 
     593             :     /* encode and send truncate flags */
     594          20 :     if (cascade)
     595           0 :         flags |= TRUNCATE_CASCADE;
     596          20 :     if (restart_seqs)
     597           0 :         flags |= TRUNCATE_RESTART_SEQS;
     598          20 :     pq_sendint8(out, flags);
     599             : 
     600          52 :     for (i = 0; i < nrelids; i++)
     601          32 :         pq_sendint32(out, relids[i]);
     602          20 : }
     603             : 
     604             : /*
     605             :  * Read TRUNCATE from stream.
     606             :  */
     607             : List *
     608          38 : logicalrep_read_truncate(StringInfo in,
     609             :                          bool *cascade, bool *restart_seqs)
     610             : {
     611             :     int         i;
     612             :     int         nrelids;
     613          38 :     List       *relids = NIL;
     614             :     uint8       flags;
     615             : 
     616          38 :     nrelids = pq_getmsgint(in, 4);
     617             : 
     618             :     /* read and decode truncate flags */
     619          38 :     flags = pq_getmsgint(in, 1);
     620          38 :     *cascade = (flags & TRUNCATE_CASCADE) > 0;
     621          38 :     *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
     622             : 
     623          94 :     for (i = 0; i < nrelids; i++)
     624          56 :         relids = lappend_oid(relids, pq_getmsgint(in, 4));
     625             : 
     626          38 :     return relids;
     627             : }
     628             : 
     629             : /*
     630             :  * Write MESSAGE to stream
     631             :  */
     632             : void
     633          10 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
     634             :                          bool transactional, const char *prefix, Size sz,
     635             :                          const char *message)
     636             : {
     637          10 :     uint8       flags = 0;
     638             : 
     639          10 :     pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
     640             : 
     641             :     /* encode and send message flags */
     642          10 :     if (transactional)
     643           4 :         flags |= MESSAGE_TRANSACTIONAL;
     644             : 
     645             :     /* transaction ID (if not valid, we're not streaming) */
     646          10 :     if (TransactionIdIsValid(xid))
     647           0 :         pq_sendint32(out, xid);
     648             : 
     649          10 :     pq_sendint8(out, flags);
     650          10 :     pq_sendint64(out, lsn);
     651          10 :     pq_sendstring(out, prefix);
     652          10 :     pq_sendint32(out, sz);
     653          10 :     pq_sendbytes(out, message, sz);
     654          10 : }
     655             : 
     656             : /*
     657             :  * Write relation description to the output stream.
     658             :  */
     659             : void
     660         678 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
     661             :                      Bitmapset *columns, bool include_gencols)
     662             : {
     663             :     char       *relname;
     664             : 
     665         678 :     pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
     666             : 
     667             :     /* transaction ID (if not valid, we're not streaming) */
     668         678 :     if (TransactionIdIsValid(xid))
     669         142 :         pq_sendint32(out, xid);
     670             : 
     671             :     /* use Oid as relation identifier */
     672         678 :     pq_sendint32(out, RelationGetRelid(rel));
     673             : 
     674             :     /* send qualified relation name */
     675         678 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
     676         678 :     relname = RelationGetRelationName(rel);
     677         678 :     pq_sendstring(out, relname);
     678             : 
     679             :     /* send replica identity */
     680         678 :     pq_sendbyte(out, rel->rd_rel->relreplident);
     681             : 
     682             :     /* send the attribute info */
     683         678 :     logicalrep_write_attrs(out, rel, columns, include_gencols);
     684         678 : }
     685             : 
     686             : /*
     687             :  * Read the relation info from stream and return as LogicalRepRelation.
     688             :  */
     689             : LogicalRepRelation *
     690         784 : logicalrep_read_rel(StringInfo in)
     691             : {
     692         784 :     LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
     693             : 
     694         784 :     rel->remoteid = pq_getmsgint(in, 4);
     695             : 
     696             :     /* Read relation name from stream */
     697         784 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
     698         784 :     rel->relname = pstrdup(pq_getmsgstring(in));
     699             : 
     700             :     /* Read the replica identity. */
     701         784 :     rel->replident = pq_getmsgbyte(in);
     702             : 
     703             :     /* Get attribute description */
     704         784 :     logicalrep_read_attrs(in, rel);
     705             : 
     706         784 :     return rel;
     707             : }
     708             : 
     709             : /*
     710             :  * Write type info to the output stream.
     711             :  *
     712             :  * This function will always write base type info.
     713             :  */
     714             : void
     715          36 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
     716             : {
     717          36 :     Oid         basetypoid = getBaseType(typoid);
     718             :     HeapTuple   tup;
     719             :     Form_pg_type typtup;
     720             : 
     721          36 :     pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
     722             : 
     723             :     /* transaction ID (if not valid, we're not streaming) */
     724          36 :     if (TransactionIdIsValid(xid))
     725           0 :         pq_sendint32(out, xid);
     726             : 
     727          36 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
     728          36 :     if (!HeapTupleIsValid(tup))
     729           0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
     730          36 :     typtup = (Form_pg_type) GETSTRUCT(tup);
     731             : 
     732             :     /* use Oid as type identifier */
     733          36 :     pq_sendint32(out, typoid);
     734             : 
     735             :     /* send qualified type name */
     736          36 :     logicalrep_write_namespace(out, typtup->typnamespace);
     737          36 :     pq_sendstring(out, NameStr(typtup->typname));
     738             : 
     739          36 :     ReleaseSysCache(tup);
     740          36 : }
     741             : 
     742             : /*
     743             :  * Read type info from the output stream.
     744             :  */
     745             : void
     746          36 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
     747             : {
     748          36 :     ltyp->remoteid = pq_getmsgint(in, 4);
     749             : 
     750             :     /* Read type name from stream */
     751          36 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
     752          36 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
     753          36 : }
     754             : 
     755             : /*
     756             :  * Write a tuple to the outputstream, in the most efficient format possible.
     757             :  */
     758             : static void
     759      364672 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
     760             :                        bool binary, Bitmapset *columns, bool include_gencols)
     761             : {
     762             :     TupleDesc   desc;
     763             :     Datum      *values;
     764             :     bool       *isnull;
     765             :     int         i;
     766      364672 :     uint16      nliveatts = 0;
     767             : 
     768      364672 :     desc = RelationGetDescr(rel);
     769             : 
     770     1096984 :     for (i = 0; i < desc->natts; i++)
     771             :     {
     772      732312 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     773             : 
     774      732312 :         if (!logicalrep_should_publish_column(att, columns, include_gencols))
     775         262 :             continue;
     776             : 
     777      732050 :         nliveatts++;
     778             :     }
     779      364672 :     pq_sendint16(out, nliveatts);
     780             : 
     781      364672 :     slot_getallattrs(slot);
     782      364672 :     values = slot->tts_values;
     783      364672 :     isnull = slot->tts_isnull;
     784             : 
     785             :     /* Write the values */
     786     1096984 :     for (i = 0; i < desc->natts; i++)
     787             :     {
     788             :         HeapTuple   typtup;
     789             :         Form_pg_type typclass;
     790      732312 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     791             : 
     792      732312 :         if (!logicalrep_should_publish_column(att, columns, include_gencols))
     793         262 :             continue;
     794             : 
     795      732050 :         if (isnull[i])
     796             :         {
     797      103826 :             pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
     798      103826 :             continue;
     799             :         }
     800             : 
     801      628224 :         if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
     802             :         {
     803             :             /*
     804             :              * Unchanged toasted datum.  (Note that we don't promise to detect
     805             :              * unchanged data in general; this is just a cheap check to avoid
     806             :              * sending large values unnecessarily.)
     807             :              */
     808           6 :             pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
     809           6 :             continue;
     810             :         }
     811             : 
     812      628218 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
     813      628218 :         if (!HeapTupleIsValid(typtup))
     814           0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
     815      628218 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
     816             : 
     817             :         /*
     818             :          * Send in binary if requested and type has suitable send function.
     819             :          */
     820      628218 :         if (binary && OidIsValid(typclass->typsend))
     821      230090 :         {
     822             :             bytea      *outputbytes;
     823             :             int         len;
     824             : 
     825      230090 :             pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
     826      230090 :             outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
     827      230090 :             len = VARSIZE(outputbytes) - VARHDRSZ;
     828      230090 :             pq_sendint(out, len, 4);    /* length */
     829      230090 :             pq_sendbytes(out, VARDATA(outputbytes), len);   /* data */
     830      230090 :             pfree(outputbytes);
     831             :         }
     832             :         else
     833             :         {
     834             :             char       *outputstr;
     835             : 
     836      398128 :             pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
     837      398128 :             outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
     838      398128 :             pq_sendcountedtext(out, outputstr, strlen(outputstr));
     839      398128 :             pfree(outputstr);
     840             :         }
     841             : 
     842      628218 :         ReleaseSysCache(typtup);
     843             :     }
     844      364672 : }
     845             : 
     846             : /*
     847             :  * Read tuple in logical replication format from stream.
     848             :  */
     849             : static void
     850      298004 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
     851             : {
     852             :     int         i;
     853             :     int         natts;
     854             : 
     855             :     /* Get number of attributes */
     856      298004 :     natts = pq_getmsgint(in, 2);
     857             : 
     858             :     /* Allocate space for per-column values; zero out unused StringInfoDatas */
     859      298004 :     tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
     860      298004 :     tuple->colstatus = (char *) palloc(natts * sizeof(char));
     861      298004 :     tuple->ncols = natts;
     862             : 
     863             :     /* Read the data */
     864      905236 :     for (i = 0; i < natts; i++)
     865             :     {
     866             :         char       *buff;
     867             :         char        kind;
     868             :         int         len;
     869      607232 :         StringInfo  value = &tuple->colvalues[i];
     870             : 
     871      607232 :         kind = pq_getmsgbyte(in);
     872      607232 :         tuple->colstatus[i] = kind;
     873             : 
     874      607232 :         switch (kind)
     875             :         {
     876      100718 :             case LOGICALREP_COLUMN_NULL:
     877             :                 /* nothing more to do */
     878      100718 :                 break;
     879           6 :             case LOGICALREP_COLUMN_UNCHANGED:
     880             :                 /* we don't receive the value of an unchanged column */
     881           6 :                 break;
     882      506508 :             case LOGICALREP_COLUMN_TEXT:
     883             :             case LOGICALREP_COLUMN_BINARY:
     884      506508 :                 len = pq_getmsgint(in, 4);  /* read length */
     885             : 
     886             :                 /* and data */
     887      506508 :                 buff = palloc(len + 1);
     888      506508 :                 pq_copymsgbytes(in, buff, len);
     889             : 
     890             :                 /*
     891             :                  * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
     892             :                  * as input functions require that.  For
     893             :                  * LOGICALREP_COLUMN_BINARY it's not technically required, but
     894             :                  * it's harmless.
     895             :                  */
     896      506508 :                 buff[len] = '\0';
     897             : 
     898      506508 :                 initStringInfoFromString(value, buff, len);
     899      506508 :                 break;
     900           0 :             default:
     901           0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
     902             :         }
     903             :     }
     904      298004 : }
     905             : 
     906             : /*
     907             :  * Write relation attribute metadata to the stream.
     908             :  */
     909             : static void
     910         678 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
     911             :                        bool include_gencols)
     912             : {
     913             :     TupleDesc   desc;
     914             :     int         i;
     915         678 :     uint16      nliveatts = 0;
     916         678 :     Bitmapset  *idattrs = NULL;
     917             :     bool        replidentfull;
     918             : 
     919         678 :     desc = RelationGetDescr(rel);
     920             : 
     921             :     /* send number of live attributes */
     922        2064 :     for (i = 0; i < desc->natts; i++)
     923             :     {
     924        1386 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     925             : 
     926        1386 :         if (!logicalrep_should_publish_column(att, columns, include_gencols))
     927         136 :             continue;
     928             : 
     929        1250 :         nliveatts++;
     930             :     }
     931         678 :     pq_sendint16(out, nliveatts);
     932             : 
     933             :     /* fetch bitmap of REPLICATION IDENTITY attributes */
     934         678 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
     935         678 :     if (!replidentfull)
     936         576 :         idattrs = RelationGetIdentityKeyBitmap(rel);
     937             : 
     938             :     /* send the attributes */
     939        2064 :     for (i = 0; i < desc->natts; i++)
     940             :     {
     941        1386 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     942        1386 :         uint8       flags = 0;
     943             : 
     944        1386 :         if (!logicalrep_should_publish_column(att, columns, include_gencols))
     945         136 :             continue;
     946             : 
     947             :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
     948        2348 :         if (replidentfull ||
     949        1098 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     950             :                           idattrs))
     951         616 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
     952             : 
     953        1250 :         pq_sendbyte(out, flags);
     954             : 
     955             :         /* attribute name */
     956        1250 :         pq_sendstring(out, NameStr(att->attname));
     957             : 
     958             :         /* attribute type id */
     959        1250 :         pq_sendint32(out, (int) att->atttypid);
     960             : 
     961             :         /* attribute mode */
     962        1250 :         pq_sendint32(out, att->atttypmod);
     963             :     }
     964             : 
     965         678 :     bms_free(idattrs);
     966         678 : }
     967             : 
     968             : /*
     969             :  * Read relation attribute metadata from the stream.
     970             :  */
     971             : static void
     972         784 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
     973             : {
     974             :     int         i;
     975             :     int         natts;
     976             :     char      **attnames;
     977             :     Oid        *atttyps;
     978         784 :     Bitmapset  *attkeys = NULL;
     979             : 
     980         784 :     natts = pq_getmsgint(in, 2);
     981         784 :     attnames = palloc(natts * sizeof(char *));
     982         784 :     atttyps = palloc(natts * sizeof(Oid));
     983             : 
     984             :     /* read the attributes */
     985        2214 :     for (i = 0; i < natts; i++)
     986             :     {
     987             :         uint8       flags;
     988             : 
     989             :         /* Check for replica identity column */
     990        1430 :         flags = pq_getmsgbyte(in);
     991        1430 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
     992         692 :             attkeys = bms_add_member(attkeys, i);
     993             : 
     994             :         /* attribute name */
     995        1430 :         attnames[i] = pstrdup(pq_getmsgstring(in));
     996             : 
     997             :         /* attribute type id */
     998        1430 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
     999             : 
    1000             :         /* we ignore attribute mode for now */
    1001        1430 :         (void) pq_getmsgint(in, 4);
    1002             :     }
    1003             : 
    1004         784 :     rel->attnames = attnames;
    1005         784 :     rel->atttyps = atttyps;
    1006         784 :     rel->attkeys = attkeys;
    1007         784 :     rel->natts = natts;
    1008         784 : }
    1009             : 
    1010             : /*
    1011             :  * Write the namespace name or empty string for pg_catalog (to save space).
    1012             :  */
    1013             : static void
    1014         714 : logicalrep_write_namespace(StringInfo out, Oid nspid)
    1015             : {
    1016         714 :     if (nspid == PG_CATALOG_NAMESPACE)
    1017           2 :         pq_sendbyte(out, '\0');
    1018             :     else
    1019             :     {
    1020         712 :         char       *nspname = get_namespace_name(nspid);
    1021             : 
    1022         712 :         if (nspname == NULL)
    1023           0 :             elog(ERROR, "cache lookup failed for namespace %u",
    1024             :                  nspid);
    1025             : 
    1026         712 :         pq_sendstring(out, nspname);
    1027             :     }
    1028         714 : }
    1029             : 
    1030             : /*
    1031             :  * Read the namespace name while treating empty string as pg_catalog.
    1032             :  */
    1033             : static const char *
    1034         820 : logicalrep_read_namespace(StringInfo in)
    1035             : {
    1036         820 :     const char *nspname = pq_getmsgstring(in);
    1037             : 
    1038         820 :     if (nspname[0] == '\0')
    1039           2 :         nspname = "pg_catalog";
    1040             : 
    1041         820 :     return nspname;
    1042             : }
    1043             : 
    1044             : /*
    1045             :  * Write the information for the start stream message to the output stream.
    1046             :  */
    1047             : void
    1048        1340 : logicalrep_write_stream_start(StringInfo out,
    1049             :                               TransactionId xid, bool first_segment)
    1050             : {
    1051        1340 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
    1052             : 
    1053             :     Assert(TransactionIdIsValid(xid));
    1054             : 
    1055             :     /* transaction ID (we're starting to stream, so must be valid) */
    1056        1340 :     pq_sendint32(out, xid);
    1057             : 
    1058             :     /* 1 if this is the first streaming segment for this xid */
    1059        1340 :     pq_sendbyte(out, first_segment ? 1 : 0);
    1060        1340 : }
    1061             : 
    1062             : /*
    1063             :  * Read the information about the start stream message from output stream.
    1064             :  */
    1065             : TransactionId
    1066        1760 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
    1067             : {
    1068             :     TransactionId xid;
    1069             : 
    1070             :     Assert(first_segment);
    1071             : 
    1072        1760 :     xid = pq_getmsgint(in, 4);
    1073        1760 :     *first_segment = (pq_getmsgbyte(in) == 1);
    1074             : 
    1075        1760 :     return xid;
    1076             : }
    1077             : 
    1078             : /*
    1079             :  * Write the stop stream message to the output stream.
    1080             :  */
    1081             : void
    1082        1340 : logicalrep_write_stream_stop(StringInfo out)
    1083             : {
    1084        1340 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
    1085        1340 : }
    1086             : 
    1087             : /*
    1088             :  * Write STREAM COMMIT to the output stream.
    1089             :  */
    1090             : void
    1091          92 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
    1092             :                                XLogRecPtr commit_lsn)
    1093             : {
    1094          92 :     uint8       flags = 0;
    1095             : 
    1096          92 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
    1097             : 
    1098             :     Assert(TransactionIdIsValid(txn->xid));
    1099             : 
    1100             :     /* transaction ID */
    1101          92 :     pq_sendint32(out, txn->xid);
    1102             : 
    1103             :     /* send the flags field (unused for now) */
    1104          92 :     pq_sendbyte(out, flags);
    1105             : 
    1106             :     /* send fields */
    1107          92 :     pq_sendint64(out, commit_lsn);
    1108          92 :     pq_sendint64(out, txn->end_lsn);
    1109          92 :     pq_sendint64(out, txn->xact_time.commit_time);
    1110          92 : }
    1111             : 
    1112             : /*
    1113             :  * Read STREAM COMMIT from the output stream.
    1114             :  */
    1115             : TransactionId
    1116         122 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
    1117             : {
    1118             :     TransactionId xid;
    1119             :     uint8       flags;
    1120             : 
    1121         122 :     xid = pq_getmsgint(in, 4);
    1122             : 
    1123             :     /* read flags (unused for now) */
    1124         122 :     flags = pq_getmsgbyte(in);
    1125             : 
    1126         122 :     if (flags != 0)
    1127           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
    1128             : 
    1129             :     /* read fields */
    1130         122 :     commit_data->commit_lsn = pq_getmsgint64(in);
    1131         122 :     commit_data->end_lsn = pq_getmsgint64(in);
    1132         122 :     commit_data->committime = pq_getmsgint64(in);
    1133             : 
    1134         122 :     return xid;
    1135             : }
    1136             : 
    1137             : /*
    1138             :  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
    1139             :  * same for the top-level transaction abort.
    1140             :  *
    1141             :  * If write_abort_info is true, send the abort_lsn and abort_time fields,
    1142             :  * otherwise don't.
    1143             :  */
    1144             : void
    1145          52 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
    1146             :                               TransactionId subxid, XLogRecPtr abort_lsn,
    1147             :                               TimestampTz abort_time, bool write_abort_info)
    1148             : {
    1149          52 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
    1150             : 
    1151             :     Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
    1152             : 
    1153             :     /* transaction ID */
    1154          52 :     pq_sendint32(out, xid);
    1155          52 :     pq_sendint32(out, subxid);
    1156             : 
    1157          52 :     if (write_abort_info)
    1158             :     {
    1159          24 :         pq_sendint64(out, abort_lsn);
    1160          24 :         pq_sendint64(out, abort_time);
    1161             :     }
    1162          52 : }
    1163             : 
    1164             : /*
    1165             :  * Read STREAM ABORT from the output stream.
    1166             :  *
    1167             :  * If read_abort_info is true, read the abort_lsn and abort_time fields,
    1168             :  * otherwise don't.
    1169             :  */
    1170             : void
    1171          76 : logicalrep_read_stream_abort(StringInfo in,
    1172             :                              LogicalRepStreamAbortData *abort_data,
    1173             :                              bool read_abort_info)
    1174             : {
    1175             :     Assert(abort_data);
    1176             : 
    1177          76 :     abort_data->xid = pq_getmsgint(in, 4);
    1178          76 :     abort_data->subxid = pq_getmsgint(in, 4);
    1179             : 
    1180          76 :     if (read_abort_info)
    1181             :     {
    1182          48 :         abort_data->abort_lsn = pq_getmsgint64(in);
    1183          48 :         abort_data->abort_time = pq_getmsgint64(in);
    1184             :     }
    1185             :     else
    1186             :     {
    1187          28 :         abort_data->abort_lsn = InvalidXLogRecPtr;
    1188          28 :         abort_data->abort_time = 0;
    1189             :     }
    1190          76 : }
    1191             : 
    1192             : /*
    1193             :  * Get string representing LogicalRepMsgType.
    1194             :  */
    1195             : const char *
    1196         720 : logicalrep_message_type(LogicalRepMsgType action)
    1197             : {
    1198             :     static char err_unknown[20];
    1199             : 
    1200         720 :     switch (action)
    1201             :     {
    1202           2 :         case LOGICAL_REP_MSG_BEGIN:
    1203           2 :             return "BEGIN";
    1204           2 :         case LOGICAL_REP_MSG_COMMIT:
    1205           2 :             return "COMMIT";
    1206           0 :         case LOGICAL_REP_MSG_ORIGIN:
    1207           0 :             return "ORIGIN";
    1208          72 :         case LOGICAL_REP_MSG_INSERT:
    1209          72 :             return "INSERT";
    1210          30 :         case LOGICAL_REP_MSG_UPDATE:
    1211          30 :             return "UPDATE";
    1212          22 :         case LOGICAL_REP_MSG_DELETE:
    1213          22 :             return "DELETE";
    1214           0 :         case LOGICAL_REP_MSG_TRUNCATE:
    1215           0 :             return "TRUNCATE";
    1216           4 :         case LOGICAL_REP_MSG_RELATION:
    1217           4 :             return "RELATION";
    1218           0 :         case LOGICAL_REP_MSG_TYPE:
    1219           0 :             return "TYPE";
    1220           0 :         case LOGICAL_REP_MSG_MESSAGE:
    1221           0 :             return "MESSAGE";
    1222           2 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    1223           2 :             return "BEGIN PREPARE";
    1224           4 :         case LOGICAL_REP_MSG_PREPARE:
    1225           4 :             return "PREPARE";
    1226           0 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    1227           0 :             return "COMMIT PREPARED";
    1228           0 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    1229           0 :             return "ROLLBACK PREPARED";
    1230          26 :         case LOGICAL_REP_MSG_STREAM_START:
    1231          26 :             return "STREAM START";
    1232         474 :         case LOGICAL_REP_MSG_STREAM_STOP:
    1233         474 :             return "STREAM STOP";
    1234          40 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    1235          40 :             return "STREAM COMMIT";
    1236          38 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    1237          38 :             return "STREAM ABORT";
    1238           4 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    1239           4 :             return "STREAM PREPARE";
    1240             :     }
    1241             : 
    1242             :     /*
    1243             :      * This message provides context in the error raised when applying a
    1244             :      * logical message. So we can't throw an error here. Return an unknown
    1245             :      * indicator value so that the original error is still reported.
    1246             :      */
    1247           0 :     snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
    1248             : 
    1249           0 :     return err_unknown;
    1250             : }
    1251             : 
    1252             : /*
    1253             :  * Check if the column 'att' of a table should be published.
    1254             :  *
    1255             :  * 'columns' represents the publication column list (if any) for that table.
    1256             :  *
    1257             :  * 'include_gencols' flag indicates whether generated columns should be
    1258             :  * published when there is no column list. Typically, this will have the same
    1259             :  * value as the 'publish_generated_columns' publication parameter.
    1260             :  *
    1261             :  * Note that generated columns can be published only when present in a
    1262             :  * publication column list, or when include_gencols is true.
    1263             :  */
    1264             : bool
    1265     1468782 : logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
    1266             :                                  bool include_gencols)
    1267             : {
    1268     1468782 :     if (att->attisdropped)
    1269         102 :         return false;
    1270             : 
    1271             :     /* If a column list is provided, publish only the cols in that list. */
    1272     1468680 :     if (columns)
    1273        1888 :         return bms_is_member(att->attnum, columns);
    1274             : 
    1275             :     /* All non-generated columns are always published. */
    1276     1466792 :     return att->attgenerated ? include_gencols : true;
    1277             : }

Generated by: LCOV version 1.14