LCOV - code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 451 497 90.7 %
Date: 2025-03-23 00:15:41 Functions: 45 46 97.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * proto.c
       4             :  *      logical replication protocol functions
       5             :  *
       6             :  * Copyright (c) 2015-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *      src/backend/replication/logical/proto.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/sysattr.h"
      16             : #include "catalog/pg_namespace.h"
      17             : #include "catalog/pg_type.h"
      18             : #include "libpq/pqformat.h"
      19             : #include "replication/logicalproto.h"
      20             : #include "utils/lsyscache.h"
      21             : #include "utils/syscache.h"
      22             : 
      23             : /*
      24             :  * Protocol message flags.
      25             :  */
      26             : #define LOGICALREP_IS_REPLICA_IDENTITY 1
      27             : 
      28             : #define MESSAGE_TRANSACTIONAL (1<<0)
      29             : #define TRUNCATE_CASCADE        (1<<0)
      30             : #define TRUNCATE_RESTART_SEQS   (1<<1)
      31             : 
      32             : static void logicalrep_write_attrs(StringInfo out, Relation rel,
      33             :                                    Bitmapset *columns,
      34             :                                    PublishGencolsType include_gencols_type);
      35             : static void logicalrep_write_tuple(StringInfo out, Relation rel,
      36             :                                    TupleTableSlot *slot,
      37             :                                    bool binary, Bitmapset *columns,
      38             :                                    PublishGencolsType include_gencols_type);
      39             : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
      40             : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
      41             : 
      42             : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
      43             : static const char *logicalrep_read_namespace(StringInfo in);
      44             : 
      45             : /*
      46             :  * Write BEGIN to the output stream.
      47             :  */
      48             : void
      49         820 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
      50             : {
      51         820 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
      52             : 
      53             :     /* fixed fields */
      54         820 :     pq_sendint64(out, txn->final_lsn);
      55         820 :     pq_sendint64(out, txn->xact_time.commit_time);
      56         820 :     pq_sendint32(out, txn->xid);
      57         820 : }
      58             : 
      59             : /*
      60             :  * Read transaction BEGIN from the stream.
      61             :  */
      62             : void
      63         886 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
      64             : {
      65             :     /* read fields */
      66         886 :     begin_data->final_lsn = pq_getmsgint64(in);
      67         886 :     if (begin_data->final_lsn == InvalidXLogRecPtr)
      68           0 :         elog(ERROR, "final_lsn not set in begin message");
      69         886 :     begin_data->committime = pq_getmsgint64(in);
      70         886 :     begin_data->xid = pq_getmsgint(in, 4);
      71         886 : }
      72             : 
      73             : 
      74             : /*
      75             :  * Write COMMIT to the output stream.
      76             :  */
      77             : void
      78         816 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
      79             :                         XLogRecPtr commit_lsn)
      80             : {
      81         816 :     uint8       flags = 0;
      82             : 
      83         816 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
      84             : 
      85             :     /* send the flags field (unused for now) */
      86         816 :     pq_sendbyte(out, flags);
      87             : 
      88             :     /* send fields */
      89         816 :     pq_sendint64(out, commit_lsn);
      90         816 :     pq_sendint64(out, txn->end_lsn);
      91         816 :     pq_sendint64(out, txn->xact_time.commit_time);
      92         816 : }
      93             : 
      94             : /*
      95             :  * Read transaction COMMIT from the stream.
      96             :  */
      97             : void
      98         840 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
      99             : {
     100             :     /* read flags (unused for now) */
     101         840 :     uint8       flags = pq_getmsgbyte(in);
     102             : 
     103         840 :     if (flags != 0)
     104           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
     105             : 
     106             :     /* read fields */
     107         840 :     commit_data->commit_lsn = pq_getmsgint64(in);
     108         840 :     commit_data->end_lsn = pq_getmsgint64(in);
     109         840 :     commit_data->committime = pq_getmsgint64(in);
     110         840 : }
     111             : 
     112             : /*
     113             :  * Write BEGIN PREPARE to the output stream.
     114             :  */
     115             : void
     116          40 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
     117             : {
     118          40 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
     119             : 
     120             :     /* fixed fields */
     121          40 :     pq_sendint64(out, txn->final_lsn);
     122          40 :     pq_sendint64(out, txn->end_lsn);
     123          40 :     pq_sendint64(out, txn->xact_time.prepare_time);
     124          40 :     pq_sendint32(out, txn->xid);
     125             : 
     126             :     /* send gid */
     127          40 :     pq_sendstring(out, txn->gid);
     128          40 : }
     129             : 
     130             : /*
     131             :  * Read transaction BEGIN PREPARE from the stream.
     132             :  */
     133             : void
     134          32 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
     135             : {
     136             :     /* read fields */
     137          32 :     begin_data->prepare_lsn = pq_getmsgint64(in);
     138          32 :     if (begin_data->prepare_lsn == InvalidXLogRecPtr)
     139           0 :         elog(ERROR, "prepare_lsn not set in begin prepare message");
     140          32 :     begin_data->end_lsn = pq_getmsgint64(in);
     141          32 :     if (begin_data->end_lsn == InvalidXLogRecPtr)
     142           0 :         elog(ERROR, "end_lsn not set in begin prepare message");
     143          32 :     begin_data->prepare_time = pq_getmsgint64(in);
     144          32 :     begin_data->xid = pq_getmsgint(in, 4);
     145             : 
     146             :     /* read gid (copy it into a pre-allocated buffer) */
     147          32 :     strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
     148          32 : }
     149             : 
     150             : /*
     151             :  * The core functionality for logicalrep_write_prepare and
     152             :  * logicalrep_write_stream_prepare.
     153             :  */
     154             : static void
     155          68 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
     156             :                                 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
     157             : {
     158          68 :     uint8       flags = 0;
     159             : 
     160          68 :     pq_sendbyte(out, type);
     161             : 
     162             :     /*
     163             :      * This should only ever happen for two-phase commit transactions, in
     164             :      * which case we expect to have a valid GID.
     165             :      */
     166             :     Assert(txn->gid != NULL);
     167             :     Assert(rbtxn_is_prepared(txn));
     168             :     Assert(TransactionIdIsValid(txn->xid));
     169             : 
     170             :     /* send the flags field */
     171          68 :     pq_sendbyte(out, flags);
     172             : 
     173             :     /* send fields */
     174          68 :     pq_sendint64(out, prepare_lsn);
     175          68 :     pq_sendint64(out, txn->end_lsn);
     176          68 :     pq_sendint64(out, txn->xact_time.prepare_time);
     177          68 :     pq_sendint32(out, txn->xid);
     178             : 
     179             :     /* send gid */
     180          68 :     pq_sendstring(out, txn->gid);
     181          68 : }
     182             : 
     183             : /*
     184             :  * Write PREPARE to the output stream.
     185             :  */
     186             : void
     187          40 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
     188             :                          XLogRecPtr prepare_lsn)
     189             : {
     190          40 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
     191             :                                     txn, prepare_lsn);
     192          40 : }
     193             : 
     194             : /*
     195             :  * The core functionality for logicalrep_read_prepare and
     196             :  * logicalrep_read_stream_prepare.
     197             :  */
     198             : static void
     199          52 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
     200             :                                LogicalRepPreparedTxnData *prepare_data)
     201             : {
     202             :     /* read flags */
     203          52 :     uint8       flags = pq_getmsgbyte(in);
     204             : 
     205          52 :     if (flags != 0)
     206           0 :         elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
     207             : 
     208             :     /* read fields */
     209          52 :     prepare_data->prepare_lsn = pq_getmsgint64(in);
     210          52 :     if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
     211           0 :         elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
     212          52 :     prepare_data->end_lsn = pq_getmsgint64(in);
     213          52 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
     214           0 :         elog(ERROR, "end_lsn is not set in %s message", msgtype);
     215          52 :     prepare_data->prepare_time = pq_getmsgint64(in);
     216          52 :     prepare_data->xid = pq_getmsgint(in, 4);
     217          52 :     if (prepare_data->xid == InvalidTransactionId)
     218           0 :         elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
     219             : 
     220             :     /* read gid (copy it into a pre-allocated buffer) */
     221          52 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     222          52 : }
     223             : 
     224             : /*
     225             :  * Read transaction PREPARE from the stream.
     226             :  */
     227             : void
     228          30 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     229             : {
     230          30 :     logicalrep_read_prepare_common(in, "prepare", prepare_data);
     231          30 : }
     232             : 
     233             : /*
     234             :  * Write COMMIT PREPARED to the output stream.
     235             :  */
     236             : void
     237          48 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
     238             :                                  XLogRecPtr commit_lsn)
     239             : {
     240          48 :     uint8       flags = 0;
     241             : 
     242          48 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
     243             : 
     244             :     /*
     245             :      * This should only ever happen for two-phase commit transactions, in
     246             :      * which case we expect to have a valid GID.
     247             :      */
     248             :     Assert(txn->gid != NULL);
     249             : 
     250             :     /* send the flags field */
     251          48 :     pq_sendbyte(out, flags);
     252             : 
     253             :     /* send fields */
     254          48 :     pq_sendint64(out, commit_lsn);
     255          48 :     pq_sendint64(out, txn->end_lsn);
     256          48 :     pq_sendint64(out, txn->xact_time.commit_time);
     257          48 :     pq_sendint32(out, txn->xid);
     258             : 
     259             :     /* send gid */
     260          48 :     pq_sendstring(out, txn->gid);
     261          48 : }
     262             : 
     263             : /*
     264             :  * Read transaction COMMIT PREPARED from the stream.
     265             :  */
     266             : void
     267          40 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
     268             : {
     269             :     /* read flags */
     270          40 :     uint8       flags = pq_getmsgbyte(in);
     271             : 
     272          40 :     if (flags != 0)
     273           0 :         elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
     274             : 
     275             :     /* read fields */
     276          40 :     prepare_data->commit_lsn = pq_getmsgint64(in);
     277          40 :     if (prepare_data->commit_lsn == InvalidXLogRecPtr)
     278           0 :         elog(ERROR, "commit_lsn is not set in commit prepared message");
     279          40 :     prepare_data->end_lsn = pq_getmsgint64(in);
     280          40 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
     281           0 :         elog(ERROR, "end_lsn is not set in commit prepared message");
     282          40 :     prepare_data->commit_time = pq_getmsgint64(in);
     283          40 :     prepare_data->xid = pq_getmsgint(in, 4);
     284             : 
     285             :     /* read gid (copy it into a pre-allocated buffer) */
     286          40 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     287          40 : }
     288             : 
     289             : /*
     290             :  * Write ROLLBACK PREPARED to the output stream.
     291             :  */
     292             : void
     293          18 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
     294             :                                    XLogRecPtr prepare_end_lsn,
     295             :                                    TimestampTz prepare_time)
     296             : {
     297          18 :     uint8       flags = 0;
     298             : 
     299          18 :     pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
     300             : 
     301             :     /*
     302             :      * This should only ever happen for two-phase commit transactions, in
     303             :      * which case we expect to have a valid GID.
     304             :      */
     305             :     Assert(txn->gid != NULL);
     306             : 
     307             :     /* send the flags field */
     308          18 :     pq_sendbyte(out, flags);
     309             : 
     310             :     /* send fields */
     311          18 :     pq_sendint64(out, prepare_end_lsn);
     312          18 :     pq_sendint64(out, txn->end_lsn);
     313          18 :     pq_sendint64(out, prepare_time);
     314          18 :     pq_sendint64(out, txn->xact_time.commit_time);
     315          18 :     pq_sendint32(out, txn->xid);
     316             : 
     317             :     /* send gid */
     318          18 :     pq_sendstring(out, txn->gid);
     319          18 : }
     320             : 
     321             : /*
     322             :  * Read transaction ROLLBACK PREPARED from the stream.
     323             :  */
     324             : void
     325          10 : logicalrep_read_rollback_prepared(StringInfo in,
     326             :                                   LogicalRepRollbackPreparedTxnData *rollback_data)
     327             : {
     328             :     /* read flags */
     329          10 :     uint8       flags = pq_getmsgbyte(in);
     330             : 
     331          10 :     if (flags != 0)
     332           0 :         elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
     333             : 
     334             :     /* read fields */
     335          10 :     rollback_data->prepare_end_lsn = pq_getmsgint64(in);
     336          10 :     if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
     337           0 :         elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
     338          10 :     rollback_data->rollback_end_lsn = pq_getmsgint64(in);
     339          10 :     if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
     340           0 :         elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
     341          10 :     rollback_data->prepare_time = pq_getmsgint64(in);
     342          10 :     rollback_data->rollback_time = pq_getmsgint64(in);
     343          10 :     rollback_data->xid = pq_getmsgint(in, 4);
     344             : 
     345             :     /* read gid (copy it into a pre-allocated buffer) */
     346          10 :     strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
     347          10 : }
     348             : 
     349             : /*
     350             :  * Write STREAM PREPARE to the output stream.
     351             :  */
     352             : void
     353          28 : logicalrep_write_stream_prepare(StringInfo out,
     354             :                                 ReorderBufferTXN *txn,
     355             :                                 XLogRecPtr prepare_lsn)
     356             : {
     357          28 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
     358             :                                     txn, prepare_lsn);
     359          28 : }
     360             : 
     361             : /*
     362             :  * Read STREAM PREPARE from the stream.
     363             :  */
     364             : void
     365          22 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     366             : {
     367          22 :     logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
     368          22 : }
     369             : 
     370             : /*
     371             :  * Write ORIGIN to the output stream.
     372             :  */
     373             : void
     374          18 : logicalrep_write_origin(StringInfo out, const char *origin,
     375             :                         XLogRecPtr origin_lsn)
     376             : {
     377          18 :     pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
     378             : 
     379             :     /* fixed fields */
     380          18 :     pq_sendint64(out, origin_lsn);
     381             : 
     382             :     /* origin string */
     383          18 :     pq_sendstring(out, origin);
     384          18 : }
     385             : 
     386             : /*
     387             :  * Read ORIGIN from the output stream.
     388             :  */
     389             : char *
     390           0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
     391             : {
     392             :     /* fixed fields */
     393           0 :     *origin_lsn = pq_getmsgint64(in);
     394             : 
     395             :     /* return origin */
     396           0 :     return pstrdup(pq_getmsgstring(in));
     397             : }
     398             : 
     399             : /*
     400             :  * Write INSERT to the output stream.
     401             :  */
     402             : void
     403      211772 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
     404             :                         TupleTableSlot *newslot, bool binary,
     405             :                         Bitmapset *columns,
     406             :                         PublishGencolsType include_gencols_type)
     407             : {
     408      211772 :     pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
     409             : 
     410             :     /* transaction ID (if not valid, we're not streaming) */
     411      211772 :     if (TransactionIdIsValid(xid))
     412      200168 :         pq_sendint32(out, xid);
     413             : 
     414             :     /* use Oid as relation identifier */
     415      211772 :     pq_sendint32(out, RelationGetRelid(rel));
     416             : 
     417      211772 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     418      211772 :     logicalrep_write_tuple(out, rel, newslot, binary, columns,
     419             :                            include_gencols_type);
     420      211772 : }
     421             : 
     422             : /*
     423             :  * Read INSERT from stream.
     424             :  *
     425             :  * Fills the new tuple.
     426             :  */
     427             : LogicalRepRelId
     428      152712 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
     429             : {
     430             :     char        action;
     431             :     LogicalRepRelId relid;
     432             : 
     433             :     /* read the relation id */
     434      152712 :     relid = pq_getmsgint(in, 4);
     435             : 
     436      152712 :     action = pq_getmsgbyte(in);
     437      152712 :     if (action != 'N')
     438           0 :         elog(ERROR, "expected new tuple but got %d",
     439             :              action);
     440             : 
     441      152712 :     logicalrep_read_tuple(in, newtup);
     442             : 
     443      152712 :     return relid;
     444             : }
     445             : 
     446             : /*
     447             :  * Write UPDATE to the output stream.
     448             :  */
     449             : void
     450       68876 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
     451             :                         TupleTableSlot *oldslot, TupleTableSlot *newslot,
     452             :                         bool binary, Bitmapset *columns,
     453             :                         PublishGencolsType include_gencols_type)
     454             : {
     455       68876 :     pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
     456             : 
     457             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     458             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     459             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     460             : 
     461             :     /* transaction ID (if not valid, we're not streaming) */
     462       68876 :     if (TransactionIdIsValid(xid))
     463       68464 :         pq_sendint32(out, xid);
     464             : 
     465             :     /* use Oid as relation identifier */
     466       68876 :     pq_sendint32(out, RelationGetRelid(rel));
     467             : 
     468       68876 :     if (oldslot != NULL)
     469             :     {
     470         252 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     471         120 :             pq_sendbyte(out, 'O');  /* old tuple follows */
     472             :         else
     473         132 :             pq_sendbyte(out, 'K');  /* old key follows */
     474         252 :         logicalrep_write_tuple(out, rel, oldslot, binary, columns,
     475             :                                include_gencols_type);
     476             :     }
     477             : 
     478       68876 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     479       68876 :     logicalrep_write_tuple(out, rel, newslot, binary, columns,
     480             :                            include_gencols_type);
     481       68876 : }
     482             : 
     483             : /*
     484             :  * Read UPDATE from stream.
     485             :  */
     486             : LogicalRepRelId
     487       63866 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
     488             :                        LogicalRepTupleData *oldtup,
     489             :                        LogicalRepTupleData *newtup)
     490             : {
     491             :     char        action;
     492             :     LogicalRepRelId relid;
     493             : 
     494             :     /* read the relation id */
     495       63866 :     relid = pq_getmsgint(in, 4);
     496             : 
     497             :     /* read and verify action */
     498       63866 :     action = pq_getmsgbyte(in);
     499       63866 :     if (action != 'K' && action != 'O' && action != 'N')
     500           0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
     501             :              action);
     502             : 
     503             :     /* check for old tuple */
     504       63866 :     if (action == 'K' || action == 'O')
     505             :     {
     506         258 :         logicalrep_read_tuple(in, oldtup);
     507         258 :         *has_oldtuple = true;
     508             : 
     509         258 :         action = pq_getmsgbyte(in);
     510             :     }
     511             :     else
     512       63608 :         *has_oldtuple = false;
     513             : 
     514             :     /* check for new  tuple */
     515       63866 :     if (action != 'N')
     516           0 :         elog(ERROR, "expected action 'N', got %c",
     517             :              action);
     518             : 
     519       63866 :     logicalrep_read_tuple(in, newtup);
     520             : 
     521       63866 :     return relid;
     522             : }
     523             : 
     524             : /*
     525             :  * Write DELETE to the output stream.
     526             :  */
     527             : void
     528       83766 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
     529             :                         TupleTableSlot *oldslot, bool binary,
     530             :                         Bitmapset *columns,
     531             :                         PublishGencolsType include_gencols_type)
     532             : {
     533             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     534             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     535             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     536             : 
     537       83766 :     pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
     538             : 
     539             :     /* transaction ID (if not valid, we're not streaming) */
     540       83766 :     if (TransactionIdIsValid(xid))
     541       83250 :         pq_sendint32(out, xid);
     542             : 
     543             :     /* use Oid as relation identifier */
     544       83766 :     pq_sendint32(out, RelationGetRelid(rel));
     545             : 
     546       83766 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     547         254 :         pq_sendbyte(out, 'O');  /* old tuple follows */
     548             :     else
     549       83512 :         pq_sendbyte(out, 'K');  /* old key follows */
     550             : 
     551       83766 :     logicalrep_write_tuple(out, rel, oldslot, binary, columns,
     552             :                            include_gencols_type);
     553       83766 : }
     554             : 
     555             : /*
     556             :  * Read DELETE from stream.
     557             :  *
     558             :  * Fills the old tuple.
     559             :  */
     560             : LogicalRepRelId
     561       80636 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
     562             : {
     563             :     char        action;
     564             :     LogicalRepRelId relid;
     565             : 
     566             :     /* read the relation id */
     567       80636 :     relid = pq_getmsgint(in, 4);
     568             : 
     569             :     /* read and verify action */
     570       80636 :     action = pq_getmsgbyte(in);
     571       80636 :     if (action != 'K' && action != 'O')
     572           0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
     573             : 
     574       80636 :     logicalrep_read_tuple(in, oldtup);
     575             : 
     576       80636 :     return relid;
     577             : }
     578             : 
     579             : /*
     580             :  * Write TRUNCATE to the output stream.
     581             :  */
     582             : void
     583          20 : logicalrep_write_truncate(StringInfo out,
     584             :                           TransactionId xid,
     585             :                           int nrelids,
     586             :                           Oid relids[],
     587             :                           bool cascade, bool restart_seqs)
     588             : {
     589             :     int         i;
     590          20 :     uint8       flags = 0;
     591             : 
     592          20 :     pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
     593             : 
     594             :     /* transaction ID (if not valid, we're not streaming) */
     595          20 :     if (TransactionIdIsValid(xid))
     596           0 :         pq_sendint32(out, xid);
     597             : 
     598          20 :     pq_sendint32(out, nrelids);
     599             : 
     600             :     /* encode and send truncate flags */
     601          20 :     if (cascade)
     602           0 :         flags |= TRUNCATE_CASCADE;
     603          20 :     if (restart_seqs)
     604           0 :         flags |= TRUNCATE_RESTART_SEQS;
     605          20 :     pq_sendint8(out, flags);
     606             : 
     607          52 :     for (i = 0; i < nrelids; i++)
     608          32 :         pq_sendint32(out, relids[i]);
     609          20 : }
     610             : 
     611             : /*
     612             :  * Read TRUNCATE from stream.
     613             :  */
     614             : List *
     615          38 : logicalrep_read_truncate(StringInfo in,
     616             :                          bool *cascade, bool *restart_seqs)
     617             : {
     618             :     int         i;
     619             :     int         nrelids;
     620          38 :     List       *relids = NIL;
     621             :     uint8       flags;
     622             : 
     623          38 :     nrelids = pq_getmsgint(in, 4);
     624             : 
     625             :     /* read and decode truncate flags */
     626          38 :     flags = pq_getmsgint(in, 1);
     627          38 :     *cascade = (flags & TRUNCATE_CASCADE) > 0;
     628          38 :     *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
     629             : 
     630          94 :     for (i = 0; i < nrelids; i++)
     631          56 :         relids = lappend_oid(relids, pq_getmsgint(in, 4));
     632             : 
     633          38 :     return relids;
     634             : }
     635             : 
     636             : /*
     637             :  * Write MESSAGE to stream
     638             :  */
     639             : void
     640          10 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
     641             :                          bool transactional, const char *prefix, Size sz,
     642             :                          const char *message)
     643             : {
     644          10 :     uint8       flags = 0;
     645             : 
     646          10 :     pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
     647             : 
     648             :     /* encode and send message flags */
     649          10 :     if (transactional)
     650           4 :         flags |= MESSAGE_TRANSACTIONAL;
     651             : 
     652             :     /* transaction ID (if not valid, we're not streaming) */
     653          10 :     if (TransactionIdIsValid(xid))
     654           0 :         pq_sendint32(out, xid);
     655             : 
     656          10 :     pq_sendint8(out, flags);
     657          10 :     pq_sendint64(out, lsn);
     658          10 :     pq_sendstring(out, prefix);
     659          10 :     pq_sendint32(out, sz);
     660          10 :     pq_sendbytes(out, message, sz);
     661          10 : }
     662             : 
     663             : /*
     664             :  * Write relation description to the output stream.
     665             :  */
     666             : void
     667         678 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
     668             :                      Bitmapset *columns,
     669             :                      PublishGencolsType include_gencols_type)
     670             : {
     671             :     char       *relname;
     672             : 
     673         678 :     pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
     674             : 
     675             :     /* transaction ID (if not valid, we're not streaming) */
     676         678 :     if (TransactionIdIsValid(xid))
     677         142 :         pq_sendint32(out, xid);
     678             : 
     679             :     /* use Oid as relation identifier */
     680         678 :     pq_sendint32(out, RelationGetRelid(rel));
     681             : 
     682             :     /* send qualified relation name */
     683         678 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
     684         678 :     relname = RelationGetRelationName(rel);
     685         678 :     pq_sendstring(out, relname);
     686             : 
     687             :     /* send replica identity */
     688         678 :     pq_sendbyte(out, rel->rd_rel->relreplident);
     689             : 
     690             :     /* send the attribute info */
     691         678 :     logicalrep_write_attrs(out, rel, columns, include_gencols_type);
     692         678 : }
     693             : 
     694             : /*
     695             :  * Read the relation info from stream and return as LogicalRepRelation.
     696             :  */
     697             : LogicalRepRelation *
     698         786 : logicalrep_read_rel(StringInfo in)
     699             : {
     700         786 :     LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
     701             : 
     702         786 :     rel->remoteid = pq_getmsgint(in, 4);
     703             : 
     704             :     /* Read relation name from stream */
     705         786 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
     706         786 :     rel->relname = pstrdup(pq_getmsgstring(in));
     707             : 
     708             :     /* Read the replica identity. */
     709         786 :     rel->replident = pq_getmsgbyte(in);
     710             : 
     711             :     /* Get attribute description */
     712         786 :     logicalrep_read_attrs(in, rel);
     713             : 
     714         786 :     return rel;
     715             : }
     716             : 
     717             : /*
     718             :  * Write type info to the output stream.
     719             :  *
     720             :  * This function will always write base type info.
     721             :  */
     722             : void
     723          36 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
     724             : {
     725          36 :     Oid         basetypoid = getBaseType(typoid);
     726             :     HeapTuple   tup;
     727             :     Form_pg_type typtup;
     728             : 
     729          36 :     pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
     730             : 
     731             :     /* transaction ID (if not valid, we're not streaming) */
     732          36 :     if (TransactionIdIsValid(xid))
     733           0 :         pq_sendint32(out, xid);
     734             : 
     735          36 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
     736          36 :     if (!HeapTupleIsValid(tup))
     737           0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
     738          36 :     typtup = (Form_pg_type) GETSTRUCT(tup);
     739             : 
     740             :     /* use Oid as type identifier */
     741          36 :     pq_sendint32(out, typoid);
     742             : 
     743             :     /* send qualified type name */
     744          36 :     logicalrep_write_namespace(out, typtup->typnamespace);
     745          36 :     pq_sendstring(out, NameStr(typtup->typname));
     746             : 
     747          36 :     ReleaseSysCache(tup);
     748          36 : }
     749             : 
     750             : /*
     751             :  * Read type info from the output stream.
     752             :  */
     753             : void
     754          36 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
     755             : {
     756          36 :     ltyp->remoteid = pq_getmsgint(in, 4);
     757             : 
     758             :     /* Read type name from stream */
     759          36 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
     760          36 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
     761          36 : }
     762             : 
     763             : /*
     764             :  * Write a tuple to the outputstream, in the most efficient format possible.
     765             :  */
     766             : static void
     767      364666 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
     768             :                        bool binary, Bitmapset *columns,
     769             :                        PublishGencolsType include_gencols_type)
     770             : {
     771             :     TupleDesc   desc;
     772             :     Datum      *values;
     773             :     bool       *isnull;
     774             :     int         i;
     775      364666 :     uint16      nliveatts = 0;
     776             : 
     777      364666 :     desc = RelationGetDescr(rel);
     778             : 
     779     1096998 :     for (i = 0; i < desc->natts; i++)
     780             :     {
     781      732332 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     782             : 
     783      732332 :         if (!logicalrep_should_publish_column(att, columns,
     784             :                                               include_gencols_type))
     785         282 :             continue;
     786             : 
     787      732050 :         nliveatts++;
     788             :     }
     789      364666 :     pq_sendint16(out, nliveatts);
     790             : 
     791      364666 :     slot_getallattrs(slot);
     792      364666 :     values = slot->tts_values;
     793      364666 :     isnull = slot->tts_isnull;
     794             : 
     795             :     /* Write the values */
     796     1096998 :     for (i = 0; i < desc->natts; i++)
     797             :     {
     798             :         HeapTuple   typtup;
     799             :         Form_pg_type typclass;
     800      732332 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     801             : 
     802      732332 :         if (!logicalrep_should_publish_column(att, columns,
     803             :                                               include_gencols_type))
     804         282 :             continue;
     805             : 
     806      732050 :         if (isnull[i])
     807             :         {
     808      103830 :             pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
     809      103830 :             continue;
     810             :         }
     811             : 
     812      628220 :         if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
     813             :         {
     814             :             /*
     815             :              * Unchanged toasted datum.  (Note that we don't promise to detect
     816             :              * unchanged data in general; this is just a cheap check to avoid
     817             :              * sending large values unnecessarily.)
     818             :              */
     819           6 :             pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
     820           6 :             continue;
     821             :         }
     822             : 
     823      628214 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
     824      628214 :         if (!HeapTupleIsValid(typtup))
     825           0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
     826      628214 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
     827             : 
     828             :         /*
     829             :          * Send in binary if requested and type has suitable send function.
     830             :          */
     831      628214 :         if (binary && OidIsValid(typclass->typsend))
     832      230086 :         {
     833             :             bytea      *outputbytes;
     834             :             int         len;
     835             : 
     836      230086 :             pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
     837      230086 :             outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
     838      230086 :             len = VARSIZE(outputbytes) - VARHDRSZ;
     839      230086 :             pq_sendint(out, len, 4);    /* length */
     840      230086 :             pq_sendbytes(out, VARDATA(outputbytes), len);   /* data */
     841      230086 :             pfree(outputbytes);
     842             :         }
     843             :         else
     844             :         {
     845             :             char       *outputstr;
     846             : 
     847      398128 :             pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
     848      398128 :             outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
     849      398128 :             pq_sendcountedtext(out, outputstr, strlen(outputstr));
     850      398128 :             pfree(outputstr);
     851             :         }
     852             : 
     853      628214 :         ReleaseSysCache(typtup);
     854             :     }
     855      364666 : }
     856             : 
     857             : /*
     858             :  * Read tuple in logical replication format from stream.
     859             :  */
     860             : static void
     861      297472 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
     862             : {
     863             :     int         i;
     864             :     int         natts;
     865             : 
     866             :     /* Get number of attributes */
     867      297472 :     natts = pq_getmsgint(in, 2);
     868             : 
     869             :     /* Allocate space for per-column values; zero out unused StringInfoDatas */
     870      297472 :     tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
     871      297472 :     tuple->colstatus = (char *) palloc(natts * sizeof(char));
     872      297472 :     tuple->ncols = natts;
     873             : 
     874             :     /* Read the data */
     875      904178 :     for (i = 0; i < natts; i++)
     876             :     {
     877             :         char       *buff;
     878             :         char        kind;
     879             :         int         len;
     880      606706 :         StringInfo  value = &tuple->colvalues[i];
     881             : 
     882      606706 :         kind = pq_getmsgbyte(in);
     883      606706 :         tuple->colstatus[i] = kind;
     884             : 
     885      606706 :         switch (kind)
     886             :         {
     887      100722 :             case LOGICALREP_COLUMN_NULL:
     888             :                 /* nothing more to do */
     889      100722 :                 break;
     890           6 :             case LOGICALREP_COLUMN_UNCHANGED:
     891             :                 /* we don't receive the value of an unchanged column */
     892           6 :                 break;
     893      505978 :             case LOGICALREP_COLUMN_TEXT:
     894             :             case LOGICALREP_COLUMN_BINARY:
     895      505978 :                 len = pq_getmsgint(in, 4);  /* read length */
     896             : 
     897             :                 /* and data */
     898      505978 :                 buff = palloc(len + 1);
     899      505978 :                 pq_copymsgbytes(in, buff, len);
     900             : 
     901             :                 /*
     902             :                  * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
     903             :                  * as input functions require that.  For
     904             :                  * LOGICALREP_COLUMN_BINARY it's not technically required, but
     905             :                  * it's harmless.
     906             :                  */
     907      505978 :                 buff[len] = '\0';
     908             : 
     909      505978 :                 initStringInfoFromString(value, buff, len);
     910      505978 :                 break;
     911           0 :             default:
     912           0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
     913             :         }
     914             :     }
     915      297472 : }
     916             : 
     917             : /*
     918             :  * Write relation attribute metadata to the stream.
     919             :  */
     920             : static void
     921         678 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
     922             :                        PublishGencolsType include_gencols_type)
     923             : {
     924             :     TupleDesc   desc;
     925             :     int         i;
     926         678 :     uint16      nliveatts = 0;
     927         678 :     Bitmapset  *idattrs = NULL;
     928             :     bool        replidentfull;
     929             : 
     930         678 :     desc = RelationGetDescr(rel);
     931             : 
     932             :     /* send number of live attributes */
     933        2070 :     for (i = 0; i < desc->natts; i++)
     934             :     {
     935        1392 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     936             : 
     937        1392 :         if (!logicalrep_should_publish_column(att, columns,
     938             :                                               include_gencols_type))
     939         142 :             continue;
     940             : 
     941        1250 :         nliveatts++;
     942             :     }
     943         678 :     pq_sendint16(out, nliveatts);
     944             : 
     945             :     /* fetch bitmap of REPLICATION IDENTITY attributes */
     946         678 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
     947         678 :     if (!replidentfull)
     948         578 :         idattrs = RelationGetIdentityKeyBitmap(rel);
     949             : 
     950             :     /* send the attributes */
     951        2070 :     for (i = 0; i < desc->natts; i++)
     952             :     {
     953        1392 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     954        1392 :         uint8       flags = 0;
     955             : 
     956        1392 :         if (!logicalrep_should_publish_column(att, columns,
     957             :                                               include_gencols_type))
     958         142 :             continue;
     959             : 
     960             :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
     961        2352 :         if (replidentfull ||
     962        1102 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     963             :                           idattrs))
     964         608 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
     965             : 
     966        1250 :         pq_sendbyte(out, flags);
     967             : 
     968             :         /* attribute name */
     969        1250 :         pq_sendstring(out, NameStr(att->attname));
     970             : 
     971             :         /* attribute type id */
     972        1250 :         pq_sendint32(out, (int) att->atttypid);
     973             : 
     974             :         /* attribute mode */
     975        1250 :         pq_sendint32(out, att->atttypmod);
     976             :     }
     977             : 
     978         678 :     bms_free(idattrs);
     979         678 : }
     980             : 
     981             : /*
     982             :  * Read relation attribute metadata from the stream.
     983             :  */
     984             : static void
     985         786 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
     986             : {
     987             :     int         i;
     988             :     int         natts;
     989             :     char      **attnames;
     990             :     Oid        *atttyps;
     991         786 :     Bitmapset  *attkeys = NULL;
     992             : 
     993         786 :     natts = pq_getmsgint(in, 2);
     994         786 :     attnames = palloc(natts * sizeof(char *));
     995         786 :     atttyps = palloc(natts * sizeof(Oid));
     996             : 
     997             :     /* read the attributes */
     998        2218 :     for (i = 0; i < natts; i++)
     999             :     {
    1000             :         uint8       flags;
    1001             : 
    1002             :         /* Check for replica identity column */
    1003        1432 :         flags = pq_getmsgbyte(in);
    1004        1432 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
    1005         686 :             attkeys = bms_add_member(attkeys, i);
    1006             : 
    1007             :         /* attribute name */
    1008        1432 :         attnames[i] = pstrdup(pq_getmsgstring(in));
    1009             : 
    1010             :         /* attribute type id */
    1011        1432 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
    1012             : 
    1013             :         /* we ignore attribute mode for now */
    1014        1432 :         (void) pq_getmsgint(in, 4);
    1015             :     }
    1016             : 
    1017         786 :     rel->attnames = attnames;
    1018         786 :     rel->atttyps = atttyps;
    1019         786 :     rel->attkeys = attkeys;
    1020         786 :     rel->natts = natts;
    1021         786 : }
    1022             : 
    1023             : /*
    1024             :  * Write the namespace name or empty string for pg_catalog (to save space).
    1025             :  */
    1026             : static void
    1027         714 : logicalrep_write_namespace(StringInfo out, Oid nspid)
    1028             : {
    1029         714 :     if (nspid == PG_CATALOG_NAMESPACE)
    1030           2 :         pq_sendbyte(out, '\0');
    1031             :     else
    1032             :     {
    1033         712 :         char       *nspname = get_namespace_name(nspid);
    1034             : 
    1035         712 :         if (nspname == NULL)
    1036           0 :             elog(ERROR, "cache lookup failed for namespace %u",
    1037             :                  nspid);
    1038             : 
    1039         712 :         pq_sendstring(out, nspname);
    1040             :     }
    1041         714 : }
    1042             : 
    1043             : /*
    1044             :  * Read the namespace name while treating empty string as pg_catalog.
    1045             :  */
    1046             : static const char *
    1047         822 : logicalrep_read_namespace(StringInfo in)
    1048             : {
    1049         822 :     const char *nspname = pq_getmsgstring(in);
    1050             : 
    1051         822 :     if (nspname[0] == '\0')
    1052           2 :         nspname = "pg_catalog";
    1053             : 
    1054         822 :     return nspname;
    1055             : }
    1056             : 
    1057             : /*
    1058             :  * Write the information for the start stream message to the output stream.
    1059             :  */
    1060             : void
    1061        1292 : logicalrep_write_stream_start(StringInfo out,
    1062             :                               TransactionId xid, bool first_segment)
    1063             : {
    1064        1292 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
    1065             : 
    1066             :     Assert(TransactionIdIsValid(xid));
    1067             : 
    1068             :     /* transaction ID (we're starting to stream, so must be valid) */
    1069        1292 :     pq_sendint32(out, xid);
    1070             : 
    1071             :     /* 1 if this is the first streaming segment for this xid */
    1072        1292 :     pq_sendbyte(out, first_segment ? 1 : 0);
    1073        1292 : }
    1074             : 
    1075             : /*
    1076             :  * Read the information about the start stream message from output stream.
    1077             :  */
    1078             : TransactionId
    1079        1712 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
    1080             : {
    1081             :     TransactionId xid;
    1082             : 
    1083             :     Assert(first_segment);
    1084             : 
    1085        1712 :     xid = pq_getmsgint(in, 4);
    1086        1712 :     *first_segment = (pq_getmsgbyte(in) == 1);
    1087             : 
    1088        1712 :     return xid;
    1089             : }
    1090             : 
    1091             : /*
    1092             :  * Write the stop stream message to the output stream.
    1093             :  */
    1094             : void
    1095        1292 : logicalrep_write_stream_stop(StringInfo out)
    1096             : {
    1097        1292 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
    1098        1292 : }
    1099             : 
    1100             : /*
    1101             :  * Write STREAM COMMIT to the output stream.
    1102             :  */
    1103             : void
    1104          92 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
    1105             :                                XLogRecPtr commit_lsn)
    1106             : {
    1107          92 :     uint8       flags = 0;
    1108             : 
    1109          92 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
    1110             : 
    1111             :     Assert(TransactionIdIsValid(txn->xid));
    1112             : 
    1113             :     /* transaction ID */
    1114          92 :     pq_sendint32(out, txn->xid);
    1115             : 
    1116             :     /* send the flags field (unused for now) */
    1117          92 :     pq_sendbyte(out, flags);
    1118             : 
    1119             :     /* send fields */
    1120          92 :     pq_sendint64(out, commit_lsn);
    1121          92 :     pq_sendint64(out, txn->end_lsn);
    1122          92 :     pq_sendint64(out, txn->xact_time.commit_time);
    1123          92 : }
    1124             : 
    1125             : /*
    1126             :  * Read STREAM COMMIT from the output stream.
    1127             :  */
    1128             : TransactionId
    1129         122 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
    1130             : {
    1131             :     TransactionId xid;
    1132             :     uint8       flags;
    1133             : 
    1134         122 :     xid = pq_getmsgint(in, 4);
    1135             : 
    1136             :     /* read flags (unused for now) */
    1137         122 :     flags = pq_getmsgbyte(in);
    1138             : 
    1139         122 :     if (flags != 0)
    1140           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
    1141             : 
    1142             :     /* read fields */
    1143         122 :     commit_data->commit_lsn = pq_getmsgint64(in);
    1144         122 :     commit_data->end_lsn = pq_getmsgint64(in);
    1145         122 :     commit_data->committime = pq_getmsgint64(in);
    1146             : 
    1147         122 :     return xid;
    1148             : }
    1149             : 
    1150             : /*
    1151             :  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
    1152             :  * same for the top-level transaction abort.
    1153             :  *
    1154             :  * If write_abort_info is true, send the abort_lsn and abort_time fields,
    1155             :  * otherwise don't.
    1156             :  */
    1157             : void
    1158          52 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
    1159             :                               TransactionId subxid, XLogRecPtr abort_lsn,
    1160             :                               TimestampTz abort_time, bool write_abort_info)
    1161             : {
    1162          52 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
    1163             : 
    1164             :     Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
    1165             : 
    1166             :     /* transaction ID */
    1167          52 :     pq_sendint32(out, xid);
    1168          52 :     pq_sendint32(out, subxid);
    1169             : 
    1170          52 :     if (write_abort_info)
    1171             :     {
    1172          24 :         pq_sendint64(out, abort_lsn);
    1173          24 :         pq_sendint64(out, abort_time);
    1174             :     }
    1175          52 : }
    1176             : 
    1177             : /*
    1178             :  * Read STREAM ABORT from the output stream.
    1179             :  *
    1180             :  * If read_abort_info is true, read the abort_lsn and abort_time fields,
    1181             :  * otherwise don't.
    1182             :  */
    1183             : void
    1184          76 : logicalrep_read_stream_abort(StringInfo in,
    1185             :                              LogicalRepStreamAbortData *abort_data,
    1186             :                              bool read_abort_info)
    1187             : {
    1188             :     Assert(abort_data);
    1189             : 
    1190          76 :     abort_data->xid = pq_getmsgint(in, 4);
    1191          76 :     abort_data->subxid = pq_getmsgint(in, 4);
    1192             : 
    1193          76 :     if (read_abort_info)
    1194             :     {
    1195          48 :         abort_data->abort_lsn = pq_getmsgint64(in);
    1196          48 :         abort_data->abort_time = pq_getmsgint64(in);
    1197             :     }
    1198             :     else
    1199             :     {
    1200          28 :         abort_data->abort_lsn = InvalidXLogRecPtr;
    1201          28 :         abort_data->abort_time = 0;
    1202             :     }
    1203          76 : }
    1204             : 
    1205             : /*
    1206             :  * Get string representing LogicalRepMsgType.
    1207             :  */
    1208             : const char *
    1209         712 : logicalrep_message_type(LogicalRepMsgType action)
    1210             : {
    1211             :     static char err_unknown[20];
    1212             : 
    1213         712 :     switch (action)
    1214             :     {
    1215           2 :         case LOGICAL_REP_MSG_BEGIN:
    1216           2 :             return "BEGIN";
    1217           2 :         case LOGICAL_REP_MSG_COMMIT:
    1218           2 :             return "COMMIT";
    1219           0 :         case LOGICAL_REP_MSG_ORIGIN:
    1220           0 :             return "ORIGIN";
    1221          66 :         case LOGICAL_REP_MSG_INSERT:
    1222          66 :             return "INSERT";
    1223          28 :         case LOGICAL_REP_MSG_UPDATE:
    1224          28 :             return "UPDATE";
    1225          22 :         case LOGICAL_REP_MSG_DELETE:
    1226          22 :             return "DELETE";
    1227           0 :         case LOGICAL_REP_MSG_TRUNCATE:
    1228           0 :             return "TRUNCATE";
    1229           4 :         case LOGICAL_REP_MSG_RELATION:
    1230           4 :             return "RELATION";
    1231           0 :         case LOGICAL_REP_MSG_TYPE:
    1232           0 :             return "TYPE";
    1233           0 :         case LOGICAL_REP_MSG_MESSAGE:
    1234           0 :             return "MESSAGE";
    1235           2 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    1236           2 :             return "BEGIN PREPARE";
    1237           4 :         case LOGICAL_REP_MSG_PREPARE:
    1238           4 :             return "PREPARE";
    1239           0 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    1240           0 :             return "COMMIT PREPARED";
    1241           0 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    1242           0 :             return "ROLLBACK PREPARED";
    1243          26 :         case LOGICAL_REP_MSG_STREAM_START:
    1244          26 :             return "STREAM START";
    1245         474 :         case LOGICAL_REP_MSG_STREAM_STOP:
    1246         474 :             return "STREAM STOP";
    1247          40 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    1248          40 :             return "STREAM COMMIT";
    1249          38 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    1250          38 :             return "STREAM ABORT";
    1251           4 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    1252           4 :             return "STREAM PREPARE";
    1253             :     }
    1254             : 
    1255             :     /*
    1256             :      * This message provides context in the error raised when applying a
    1257             :      * logical message. So we can't throw an error here. Return an unknown
    1258             :      * indicator value so that the original error is still reported.
    1259             :      */
    1260           0 :     snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
    1261             : 
    1262           0 :     return err_unknown;
    1263             : }
    1264             : 
    1265             : /*
    1266             :  * Check if the column 'att' of a table should be published.
    1267             :  *
    1268             :  * 'columns' represents the publication column list (if any) for that table.
    1269             :  *
    1270             :  * 'include_gencols_type' value indicates whether generated columns should be
    1271             :  * published when there is no column list. Typically, this will have the same
    1272             :  * value as the 'publish_generated_columns' publication parameter.
    1273             :  *
    1274             :  * Note that generated columns can be published only when present in a
    1275             :  * publication column list, or when include_gencols_type is
    1276             :  * PUBLISH_GENCOLS_STORED.
    1277             :  */
    1278             : bool
    1279     1468840 : logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
    1280             :                                  PublishGencolsType include_gencols_type)
    1281             : {
    1282     1468840 :     if (att->attisdropped)
    1283         102 :         return false;
    1284             : 
    1285             :     /* If a column list is provided, publish only the cols in that list. */
    1286     1468738 :     if (columns)
    1287        1898 :         return bms_is_member(att->attnum, columns);
    1288             : 
    1289             :     /* All non-generated columns are always published. */
    1290     1466840 :     if (!att->attgenerated)
    1291     1466726 :         return true;
    1292             : 
    1293             :     /*
    1294             :      * Stored generated columns are only published when the user sets
    1295             :      * publish_generated_columns as stored.
    1296             :      */
    1297         114 :     if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
    1298          66 :         return include_gencols_type == PUBLISH_GENCOLS_STORED;
    1299             : 
    1300          48 :     return false;
    1301             : }

Generated by: LCOV version 1.14