LCOV - code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 451 497 90.7 %
Date: 2024-04-24 06:11:09 Functions: 45 46 97.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * proto.c
       4             :  *      logical replication protocol functions
       5             :  *
       6             :  * Copyright (c) 2015-2024, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *      src/backend/replication/logical/proto.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/sysattr.h"
      16             : #include "catalog/pg_namespace.h"
      17             : #include "catalog/pg_type.h"
      18             : #include "libpq/pqformat.h"
      19             : #include "replication/logicalproto.h"
      20             : #include "utils/lsyscache.h"
      21             : #include "utils/syscache.h"
      22             : 
      23             : /*
      24             :  * Protocol message flags.
      25             :  */
      26             : #define LOGICALREP_IS_REPLICA_IDENTITY 1
      27             : 
      28             : #define MESSAGE_TRANSACTIONAL (1<<0)
      29             : #define TRUNCATE_CASCADE        (1<<0)
      30             : #define TRUNCATE_RESTART_SEQS   (1<<1)
      31             : 
      32             : static void logicalrep_write_attrs(StringInfo out, Relation rel,
      33             :                                    Bitmapset *columns);
      34             : static void logicalrep_write_tuple(StringInfo out, Relation rel,
      35             :                                    TupleTableSlot *slot,
      36             :                                    bool binary, Bitmapset *columns);
      37             : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
      38             : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
      39             : 
      40             : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
      41             : static const char *logicalrep_read_namespace(StringInfo in);
      42             : 
      43             : /*
      44             :  * Check if a column is covered by a column list.
      45             :  *
      46             :  * Need to be careful about NULL, which is treated as a column list covering
      47             :  * all columns.
      48             :  */
      49             : static bool
      50     1465868 : column_in_column_list(int attnum, Bitmapset *columns)
      51             : {
      52     1465868 :     return (columns == NULL || bms_is_member(attnum, columns));
      53             : }
      54             : 
      55             : 
      56             : /*
      57             :  * Write BEGIN to the output stream.
      58             :  */
      59             : void
      60         708 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
      61             : {
      62         708 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
      63             : 
      64             :     /* fixed fields */
      65         708 :     pq_sendint64(out, txn->final_lsn);
      66         708 :     pq_sendint64(out, txn->xact_time.commit_time);
      67         708 :     pq_sendint32(out, txn->xid);
      68         708 : }
      69             : 
      70             : /*
      71             :  * Read transaction BEGIN from the stream.
      72             :  */
      73             : void
      74         814 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
      75             : {
      76             :     /* read fields */
      77         814 :     begin_data->final_lsn = pq_getmsgint64(in);
      78         814 :     if (begin_data->final_lsn == InvalidXLogRecPtr)
      79           0 :         elog(ERROR, "final_lsn not set in begin message");
      80         814 :     begin_data->committime = pq_getmsgint64(in);
      81         814 :     begin_data->xid = pq_getmsgint(in, 4);
      82         814 : }
      83             : 
      84             : 
      85             : /*
      86             :  * Write COMMIT to the output stream.
      87             :  */
      88             : void
      89         708 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
      90             :                         XLogRecPtr commit_lsn)
      91             : {
      92         708 :     uint8       flags = 0;
      93             : 
      94         708 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
      95             : 
      96             :     /* send the flags field (unused for now) */
      97         708 :     pq_sendbyte(out, flags);
      98             : 
      99             :     /* send fields */
     100         708 :     pq_sendint64(out, commit_lsn);
     101         708 :     pq_sendint64(out, txn->end_lsn);
     102         708 :     pq_sendint64(out, txn->xact_time.commit_time);
     103         708 : }
     104             : 
     105             : /*
     106             :  * Read transaction COMMIT from the stream.
     107             :  */
     108             : void
     109         760 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
     110             : {
     111             :     /* read flags (unused for now) */
     112         760 :     uint8       flags = pq_getmsgbyte(in);
     113             : 
     114         760 :     if (flags != 0)
     115           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
     116             : 
     117             :     /* read fields */
     118         760 :     commit_data->commit_lsn = pq_getmsgint64(in);
     119         760 :     commit_data->end_lsn = pq_getmsgint64(in);
     120         760 :     commit_data->committime = pq_getmsgint64(in);
     121         760 : }
     122             : 
     123             : /*
     124             :  * Write BEGIN PREPARE to the output stream.
     125             :  */
     126             : void
     127          36 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
     128             : {
     129          36 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
     130             : 
     131             :     /* fixed fields */
     132          36 :     pq_sendint64(out, txn->final_lsn);
     133          36 :     pq_sendint64(out, txn->end_lsn);
     134          36 :     pq_sendint64(out, txn->xact_time.prepare_time);
     135          36 :     pq_sendint32(out, txn->xid);
     136             : 
     137             :     /* send gid */
     138          36 :     pq_sendstring(out, txn->gid);
     139          36 : }
     140             : 
     141             : /*
     142             :  * Read transaction BEGIN PREPARE from the stream.
     143             :  */
     144             : void
     145          28 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
     146             : {
     147             :     /* read fields */
     148          28 :     begin_data->prepare_lsn = pq_getmsgint64(in);
     149          28 :     if (begin_data->prepare_lsn == InvalidXLogRecPtr)
     150           0 :         elog(ERROR, "prepare_lsn not set in begin prepare message");
     151          28 :     begin_data->end_lsn = pq_getmsgint64(in);
     152          28 :     if (begin_data->end_lsn == InvalidXLogRecPtr)
     153           0 :         elog(ERROR, "end_lsn not set in begin prepare message");
     154          28 :     begin_data->prepare_time = pq_getmsgint64(in);
     155          28 :     begin_data->xid = pq_getmsgint(in, 4);
     156             : 
     157             :     /* read gid (copy it into a pre-allocated buffer) */
     158          28 :     strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
     159          28 : }
     160             : 
     161             : /*
     162             :  * The core functionality for logicalrep_write_prepare and
     163             :  * logicalrep_write_stream_prepare.
     164             :  */
     165             : static void
     166          64 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
     167             :                                 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
     168             : {
     169          64 :     uint8       flags = 0;
     170             : 
     171          64 :     pq_sendbyte(out, type);
     172             : 
     173             :     /*
     174             :      * This should only ever happen for two-phase commit transactions, in
     175             :      * which case we expect to have a valid GID.
     176             :      */
     177             :     Assert(txn->gid != NULL);
     178             :     Assert(rbtxn_prepared(txn));
     179             :     Assert(TransactionIdIsValid(txn->xid));
     180             : 
     181             :     /* send the flags field */
     182          64 :     pq_sendbyte(out, flags);
     183             : 
     184             :     /* send fields */
     185          64 :     pq_sendint64(out, prepare_lsn);
     186          64 :     pq_sendint64(out, txn->end_lsn);
     187          64 :     pq_sendint64(out, txn->xact_time.prepare_time);
     188          64 :     pq_sendint32(out, txn->xid);
     189             : 
     190             :     /* send gid */
     191          64 :     pq_sendstring(out, txn->gid);
     192          64 : }
     193             : 
     194             : /*
     195             :  * Write PREPARE to the output stream.
     196             :  */
     197             : void
     198          36 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
     199             :                          XLogRecPtr prepare_lsn)
     200             : {
     201          36 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
     202             :                                     txn, prepare_lsn);
     203          36 : }
     204             : 
     205             : /*
     206             :  * The core functionality for logicalrep_read_prepare and
     207             :  * logicalrep_read_stream_prepare.
     208             :  */
     209             : static void
     210          48 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
     211             :                                LogicalRepPreparedTxnData *prepare_data)
     212             : {
     213             :     /* read flags */
     214          48 :     uint8       flags = pq_getmsgbyte(in);
     215             : 
     216          48 :     if (flags != 0)
     217           0 :         elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
     218             : 
     219             :     /* read fields */
     220          48 :     prepare_data->prepare_lsn = pq_getmsgint64(in);
     221          48 :     if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
     222           0 :         elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
     223          48 :     prepare_data->end_lsn = pq_getmsgint64(in);
     224          48 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
     225           0 :         elog(ERROR, "end_lsn is not set in %s message", msgtype);
     226          48 :     prepare_data->prepare_time = pq_getmsgint64(in);
     227          48 :     prepare_data->xid = pq_getmsgint(in, 4);
     228          48 :     if (prepare_data->xid == InvalidTransactionId)
     229           0 :         elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
     230             : 
     231             :     /* read gid (copy it into a pre-allocated buffer) */
     232          48 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     233          48 : }
     234             : 
     235             : /*
     236             :  * Read transaction PREPARE from the stream.
     237             :  */
     238             : void
     239          26 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     240             : {
     241          26 :     logicalrep_read_prepare_common(in, "prepare", prepare_data);
     242          26 : }
     243             : 
     244             : /*
     245             :  * Write COMMIT PREPARED to the output stream.
     246             :  */
     247             : void
     248          46 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
     249             :                                  XLogRecPtr commit_lsn)
     250             : {
     251          46 :     uint8       flags = 0;
     252             : 
     253          46 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
     254             : 
     255             :     /*
     256             :      * This should only ever happen for two-phase commit transactions, in
     257             :      * which case we expect to have a valid GID.
     258             :      */
     259             :     Assert(txn->gid != NULL);
     260             : 
     261             :     /* send the flags field */
     262          46 :     pq_sendbyte(out, flags);
     263             : 
     264             :     /* send fields */
     265          46 :     pq_sendint64(out, commit_lsn);
     266          46 :     pq_sendint64(out, txn->end_lsn);
     267          46 :     pq_sendint64(out, txn->xact_time.commit_time);
     268          46 :     pq_sendint32(out, txn->xid);
     269             : 
     270             :     /* send gid */
     271          46 :     pq_sendstring(out, txn->gid);
     272          46 : }
     273             : 
     274             : /*
     275             :  * Read transaction COMMIT PREPARED from the stream.
     276             :  */
     277             : void
     278          38 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
     279             : {
     280             :     /* read flags */
     281          38 :     uint8       flags = pq_getmsgbyte(in);
     282             : 
     283          38 :     if (flags != 0)
     284           0 :         elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
     285             : 
     286             :     /* read fields */
     287          38 :     prepare_data->commit_lsn = pq_getmsgint64(in);
     288          38 :     if (prepare_data->commit_lsn == InvalidXLogRecPtr)
     289           0 :         elog(ERROR, "commit_lsn is not set in commit prepared message");
     290          38 :     prepare_data->end_lsn = pq_getmsgint64(in);
     291          38 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
     292           0 :         elog(ERROR, "end_lsn is not set in commit prepared message");
     293          38 :     prepare_data->commit_time = pq_getmsgint64(in);
     294          38 :     prepare_data->xid = pq_getmsgint(in, 4);
     295             : 
     296             :     /* read gid (copy it into a pre-allocated buffer) */
     297          38 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     298          38 : }
     299             : 
     300             : /*
     301             :  * Write ROLLBACK PREPARED to the output stream.
     302             :  */
     303             : void
     304          18 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
     305             :                                    XLogRecPtr prepare_end_lsn,
     306             :                                    TimestampTz prepare_time)
     307             : {
     308          18 :     uint8       flags = 0;
     309             : 
     310          18 :     pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
     311             : 
     312             :     /*
     313             :      * This should only ever happen for two-phase commit transactions, in
     314             :      * which case we expect to have a valid GID.
     315             :      */
     316             :     Assert(txn->gid != NULL);
     317             : 
     318             :     /* send the flags field */
     319          18 :     pq_sendbyte(out, flags);
     320             : 
     321             :     /* send fields */
     322          18 :     pq_sendint64(out, prepare_end_lsn);
     323          18 :     pq_sendint64(out, txn->end_lsn);
     324          18 :     pq_sendint64(out, prepare_time);
     325          18 :     pq_sendint64(out, txn->xact_time.commit_time);
     326          18 :     pq_sendint32(out, txn->xid);
     327             : 
     328             :     /* send gid */
     329          18 :     pq_sendstring(out, txn->gid);
     330          18 : }
     331             : 
     332             : /*
     333             :  * Read transaction ROLLBACK PREPARED from the stream.
     334             :  */
     335             : void
     336          10 : logicalrep_read_rollback_prepared(StringInfo in,
     337             :                                   LogicalRepRollbackPreparedTxnData *rollback_data)
     338             : {
     339             :     /* read flags */
     340          10 :     uint8       flags = pq_getmsgbyte(in);
     341             : 
     342          10 :     if (flags != 0)
     343           0 :         elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
     344             : 
     345             :     /* read fields */
     346          10 :     rollback_data->prepare_end_lsn = pq_getmsgint64(in);
     347          10 :     if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
     348           0 :         elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
     349          10 :     rollback_data->rollback_end_lsn = pq_getmsgint64(in);
     350          10 :     if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
     351           0 :         elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
     352          10 :     rollback_data->prepare_time = pq_getmsgint64(in);
     353          10 :     rollback_data->rollback_time = pq_getmsgint64(in);
     354          10 :     rollback_data->xid = pq_getmsgint(in, 4);
     355             : 
     356             :     /* read gid (copy it into a pre-allocated buffer) */
     357          10 :     strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
     358          10 : }
     359             : 
     360             : /*
     361             :  * Write STREAM PREPARE to the output stream.
     362             :  */
     363             : void
     364          28 : logicalrep_write_stream_prepare(StringInfo out,
     365             :                                 ReorderBufferTXN *txn,
     366             :                                 XLogRecPtr prepare_lsn)
     367             : {
     368          28 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
     369             :                                     txn, prepare_lsn);
     370          28 : }
     371             : 
     372             : /*
     373             :  * Read STREAM PREPARE from the stream.
     374             :  */
     375             : void
     376          22 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     377             : {
     378          22 :     logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
     379          22 : }
     380             : 
     381             : /*
     382             :  * Write ORIGIN to the output stream.
     383             :  */
     384             : void
     385          18 : logicalrep_write_origin(StringInfo out, const char *origin,
     386             :                         XLogRecPtr origin_lsn)
     387             : {
     388          18 :     pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
     389             : 
     390             :     /* fixed fields */
     391          18 :     pq_sendint64(out, origin_lsn);
     392             : 
     393             :     /* origin string */
     394          18 :     pq_sendstring(out, origin);
     395          18 : }
     396             : 
     397             : /*
     398             :  * Read ORIGIN from the output stream.
     399             :  */
     400             : char *
     401           0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
     402             : {
     403             :     /* fixed fields */
     404           0 :     *origin_lsn = pq_getmsgint64(in);
     405             : 
     406             :     /* return origin */
     407           0 :     return pstrdup(pq_getmsgstring(in));
     408             : }
     409             : 
     410             : /*
     411             :  * Write INSERT to the output stream.
     412             :  */
     413             : void
     414      211648 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
     415             :                         TupleTableSlot *newslot, bool binary, Bitmapset *columns)
     416             : {
     417      211648 :     pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
     418             : 
     419             :     /* transaction ID (if not valid, we're not streaming) */
     420      211648 :     if (TransactionIdIsValid(xid))
     421      200168 :         pq_sendint32(out, xid);
     422             : 
     423             :     /* use Oid as relation identifier */
     424      211648 :     pq_sendint32(out, RelationGetRelid(rel));
     425             : 
     426      211648 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     427      211648 :     logicalrep_write_tuple(out, rel, newslot, binary, columns);
     428      211648 : }
     429             : 
     430             : /*
     431             :  * Read INSERT from stream.
     432             :  *
     433             :  * Fills the new tuple.
     434             :  */
     435             : LogicalRepRelId
     436      152542 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
     437             : {
     438             :     char        action;
     439             :     LogicalRepRelId relid;
     440             : 
     441             :     /* read the relation id */
     442      152542 :     relid = pq_getmsgint(in, 4);
     443             : 
     444      152542 :     action = pq_getmsgbyte(in);
     445      152542 :     if (action != 'N')
     446           0 :         elog(ERROR, "expected new tuple but got %d",
     447             :              action);
     448             : 
     449      152542 :     logicalrep_read_tuple(in, newtup);
     450             : 
     451      152542 :     return relid;
     452             : }
     453             : 
     454             : /*
     455             :  * Write UPDATE to the output stream.
     456             :  */
     457             : void
     458       68836 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
     459             :                         TupleTableSlot *oldslot, TupleTableSlot *newslot,
     460             :                         bool binary, Bitmapset *columns)
     461             : {
     462       68836 :     pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
     463             : 
     464             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     465             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     466             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     467             : 
     468             :     /* transaction ID (if not valid, we're not streaming) */
     469       68836 :     if (TransactionIdIsValid(xid))
     470       68464 :         pq_sendint32(out, xid);
     471             : 
     472             :     /* use Oid as relation identifier */
     473       68836 :     pq_sendint32(out, RelationGetRelid(rel));
     474             : 
     475       68836 :     if (oldslot != NULL)
     476             :     {
     477         222 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     478         102 :             pq_sendbyte(out, 'O');  /* old tuple follows */
     479             :         else
     480         120 :             pq_sendbyte(out, 'K');  /* old key follows */
     481         222 :         logicalrep_write_tuple(out, rel, oldslot, binary, columns);
     482             :     }
     483             : 
     484       68836 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     485       68836 :     logicalrep_write_tuple(out, rel, newslot, binary, columns);
     486       68836 : }
     487             : 
     488             : /*
     489             :  * Read UPDATE from stream.
     490             :  */
     491             : LogicalRepRelId
     492       63844 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
     493             :                        LogicalRepTupleData *oldtup,
     494             :                        LogicalRepTupleData *newtup)
     495             : {
     496             :     char        action;
     497             :     LogicalRepRelId relid;
     498             : 
     499             :     /* read the relation id */
     500       63844 :     relid = pq_getmsgint(in, 4);
     501             : 
     502             :     /* read and verify action */
     503       63844 :     action = pq_getmsgbyte(in);
     504       63844 :     if (action != 'K' && action != 'O' && action != 'N')
     505           0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
     506             :              action);
     507             : 
     508             :     /* check for old tuple */
     509       63844 :     if (action == 'K' || action == 'O')
     510             :     {
     511         246 :         logicalrep_read_tuple(in, oldtup);
     512         246 :         *has_oldtuple = true;
     513             : 
     514         246 :         action = pq_getmsgbyte(in);
     515             :     }
     516             :     else
     517       63598 :         *has_oldtuple = false;
     518             : 
     519             :     /* check for new  tuple */
     520       63844 :     if (action != 'N')
     521           0 :         elog(ERROR, "expected action 'N', got %c",
     522             :              action);
     523             : 
     524       63844 :     logicalrep_read_tuple(in, newtup);
     525             : 
     526       63844 :     return relid;
     527             : }
     528             : 
     529             : /*
     530             :  * Write DELETE to the output stream.
     531             :  */
     532             : void
     533       83720 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
     534             :                         TupleTableSlot *oldslot, bool binary,
     535             :                         Bitmapset *columns)
     536             : {
     537             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     538             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     539             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     540             : 
     541       83720 :     pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
     542             : 
     543             :     /* transaction ID (if not valid, we're not streaming) */
     544       83720 :     if (TransactionIdIsValid(xid))
     545       83250 :         pq_sendint32(out, xid);
     546             : 
     547             :     /* use Oid as relation identifier */
     548       83720 :     pq_sendint32(out, RelationGetRelid(rel));
     549             : 
     550       83720 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     551         240 :         pq_sendbyte(out, 'O');  /* old tuple follows */
     552             :     else
     553       83480 :         pq_sendbyte(out, 'K');  /* old key follows */
     554             : 
     555       83720 :     logicalrep_write_tuple(out, rel, oldslot, binary, columns);
     556       83720 : }
     557             : 
     558             : /*
     559             :  * Read DELETE from stream.
     560             :  *
     561             :  * Fills the old tuple.
     562             :  */
     563             : LogicalRepRelId
     564       80610 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
     565             : {
     566             :     char        action;
     567             :     LogicalRepRelId relid;
     568             : 
     569             :     /* read the relation id */
     570       80610 :     relid = pq_getmsgint(in, 4);
     571             : 
     572             :     /* read and verify action */
     573       80610 :     action = pq_getmsgbyte(in);
     574       80610 :     if (action != 'K' && action != 'O')
     575           0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
     576             : 
     577       80610 :     logicalrep_read_tuple(in, oldtup);
     578             : 
     579       80610 :     return relid;
     580             : }
     581             : 
     582             : /*
     583             :  * Write TRUNCATE to the output stream.
     584             :  */
     585             : void
     586          16 : logicalrep_write_truncate(StringInfo out,
     587             :                           TransactionId xid,
     588             :                           int nrelids,
     589             :                           Oid relids[],
     590             :                           bool cascade, bool restart_seqs)
     591             : {
     592             :     int         i;
     593          16 :     uint8       flags = 0;
     594             : 
     595          16 :     pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
     596             : 
     597             :     /* transaction ID (if not valid, we're not streaming) */
     598          16 :     if (TransactionIdIsValid(xid))
     599           0 :         pq_sendint32(out, xid);
     600             : 
     601          16 :     pq_sendint32(out, nrelids);
     602             : 
     603             :     /* encode and send truncate flags */
     604          16 :     if (cascade)
     605           0 :         flags |= TRUNCATE_CASCADE;
     606          16 :     if (restart_seqs)
     607           0 :         flags |= TRUNCATE_RESTART_SEQS;
     608          16 :     pq_sendint8(out, flags);
     609             : 
     610          42 :     for (i = 0; i < nrelids; i++)
     611          26 :         pq_sendint32(out, relids[i]);
     612          16 : }
     613             : 
     614             : /*
     615             :  * Read TRUNCATE from stream.
     616             :  */
     617             : List *
     618          38 : logicalrep_read_truncate(StringInfo in,
     619             :                          bool *cascade, bool *restart_seqs)
     620             : {
     621             :     int         i;
     622             :     int         nrelids;
     623          38 :     List       *relids = NIL;
     624             :     uint8       flags;
     625             : 
     626          38 :     nrelids = pq_getmsgint(in, 4);
     627             : 
     628             :     /* read and decode truncate flags */
     629          38 :     flags = pq_getmsgint(in, 1);
     630          38 :     *cascade = (flags & TRUNCATE_CASCADE) > 0;
     631          38 :     *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
     632             : 
     633          94 :     for (i = 0; i < nrelids; i++)
     634          56 :         relids = lappend_oid(relids, pq_getmsgint(in, 4));
     635             : 
     636          38 :     return relids;
     637             : }
     638             : 
     639             : /*
     640             :  * Write MESSAGE to stream
     641             :  */
     642             : void
     643          10 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
     644             :                          bool transactional, const char *prefix, Size sz,
     645             :                          const char *message)
     646             : {
     647          10 :     uint8       flags = 0;
     648             : 
     649          10 :     pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
     650             : 
     651             :     /* encode and send message flags */
     652          10 :     if (transactional)
     653           4 :         flags |= MESSAGE_TRANSACTIONAL;
     654             : 
     655             :     /* transaction ID (if not valid, we're not streaming) */
     656          10 :     if (TransactionIdIsValid(xid))
     657           0 :         pq_sendint32(out, xid);
     658             : 
     659          10 :     pq_sendint8(out, flags);
     660          10 :     pq_sendint64(out, lsn);
     661          10 :     pq_sendstring(out, prefix);
     662          10 :     pq_sendint32(out, sz);
     663          10 :     pq_sendbytes(out, message, sz);
     664          10 : }
     665             : 
     666             : /*
     667             :  * Write relation description to the output stream.
     668             :  */
     669             : void
     670         618 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
     671             :                      Bitmapset *columns)
     672             : {
     673             :     char       *relname;
     674             : 
     675         618 :     pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
     676             : 
     677             :     /* transaction ID (if not valid, we're not streaming) */
     678         618 :     if (TransactionIdIsValid(xid))
     679         142 :         pq_sendint32(out, xid);
     680             : 
     681             :     /* use Oid as relation identifier */
     682         618 :     pq_sendint32(out, RelationGetRelid(rel));
     683             : 
     684             :     /* send qualified relation name */
     685         618 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
     686         618 :     relname = RelationGetRelationName(rel);
     687         618 :     pq_sendstring(out, relname);
     688             : 
     689             :     /* send replica identity */
     690         618 :     pq_sendbyte(out, rel->rd_rel->relreplident);
     691             : 
     692             :     /* send the attribute info */
     693         618 :     logicalrep_write_attrs(out, rel, columns);
     694         618 : }
     695             : 
     696             : /*
     697             :  * Read the relation info from stream and return as LogicalRepRelation.
     698             :  */
     699             : LogicalRepRelation *
     700         756 : logicalrep_read_rel(StringInfo in)
     701             : {
     702         756 :     LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
     703             : 
     704         756 :     rel->remoteid = pq_getmsgint(in, 4);
     705             : 
     706             :     /* Read relation name from stream */
     707         756 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
     708         756 :     rel->relname = pstrdup(pq_getmsgstring(in));
     709             : 
     710             :     /* Read the replica identity. */
     711         756 :     rel->replident = pq_getmsgbyte(in);
     712             : 
     713             :     /* Get attribute description */
     714         756 :     logicalrep_read_attrs(in, rel);
     715             : 
     716         756 :     return rel;
     717             : }
     718             : 
     719             : /*
     720             :  * Write type info to the output stream.
     721             :  *
     722             :  * This function will always write base type info.
     723             :  */
     724             : void
     725          36 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
     726             : {
     727          36 :     Oid         basetypoid = getBaseType(typoid);
     728             :     HeapTuple   tup;
     729             :     Form_pg_type typtup;
     730             : 
     731          36 :     pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
     732             : 
     733             :     /* transaction ID (if not valid, we're not streaming) */
     734          36 :     if (TransactionIdIsValid(xid))
     735           0 :         pq_sendint32(out, xid);
     736             : 
     737          36 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
     738          36 :     if (!HeapTupleIsValid(tup))
     739           0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
     740          36 :     typtup = (Form_pg_type) GETSTRUCT(tup);
     741             : 
     742             :     /* use Oid as relation identifier */
     743          36 :     pq_sendint32(out, typoid);
     744             : 
     745             :     /* send qualified type name */
     746          36 :     logicalrep_write_namespace(out, typtup->typnamespace);
     747          36 :     pq_sendstring(out, NameStr(typtup->typname));
     748             : 
     749          36 :     ReleaseSysCache(tup);
     750          36 : }
     751             : 
     752             : /*
     753             :  * Read type info from the output stream.
     754             :  */
     755             : void
     756          36 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
     757             : {
     758          36 :     ltyp->remoteid = pq_getmsgint(in, 4);
     759             : 
     760             :     /* Read type name from stream */
     761          36 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
     762          36 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
     763          36 : }
     764             : 
     765             : /*
     766             :  * Write a tuple to the outputstream, in the most efficient format possible.
     767             :  */
     768             : static void
     769      364426 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
     770             :                        bool binary, Bitmapset *columns)
     771             : {
     772             :     TupleDesc   desc;
     773             :     Datum      *values;
     774             :     bool       *isnull;
     775             :     int         i;
     776      364426 :     uint16      nliveatts = 0;
     777             : 
     778      364426 :     desc = RelationGetDescr(rel);
     779             : 
     780     1096160 :     for (i = 0; i < desc->natts; i++)
     781             :     {
     782      731734 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     783             : 
     784      731734 :         if (att->attisdropped || att->attgenerated)
     785          16 :             continue;
     786             : 
     787      731718 :         if (!column_in_column_list(att->attnum, columns))
     788         204 :             continue;
     789             : 
     790      731514 :         nliveatts++;
     791             :     }
     792      364426 :     pq_sendint16(out, nliveatts);
     793             : 
     794      364426 :     slot_getallattrs(slot);
     795      364426 :     values = slot->tts_values;
     796      364426 :     isnull = slot->tts_isnull;
     797             : 
     798             :     /* Write the values */
     799     1096160 :     for (i = 0; i < desc->natts; i++)
     800             :     {
     801             :         HeapTuple   typtup;
     802             :         Form_pg_type typclass;
     803      731734 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     804             : 
     805      731734 :         if (att->attisdropped || att->attgenerated)
     806          16 :             continue;
     807             : 
     808      731718 :         if (!column_in_column_list(att->attnum, columns))
     809         204 :             continue;
     810             : 
     811      731514 :         if (isnull[i])
     812             :         {
     813      103746 :             pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
     814      103746 :             continue;
     815             :         }
     816             : 
     817      627768 :         if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
     818             :         {
     819             :             /*
     820             :              * Unchanged toasted datum.  (Note that we don't promise to detect
     821             :              * unchanged data in general; this is just a cheap check to avoid
     822             :              * sending large values unnecessarily.)
     823             :              */
     824           6 :             pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
     825           6 :             continue;
     826             :         }
     827             : 
     828      627762 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
     829      627762 :         if (!HeapTupleIsValid(typtup))
     830           0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
     831      627762 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
     832             : 
     833             :         /*
     834             :          * Send in binary if requested and type has suitable send function.
     835             :          */
     836      627762 :         if (binary && OidIsValid(typclass->typsend))
     837      230086 :         {
     838             :             bytea      *outputbytes;
     839             :             int         len;
     840             : 
     841      230086 :             pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
     842      230086 :             outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
     843      230086 :             len = VARSIZE(outputbytes) - VARHDRSZ;
     844      230086 :             pq_sendint(out, len, 4);    /* length */
     845      230086 :             pq_sendbytes(out, VARDATA(outputbytes), len);   /* data */
     846      230086 :             pfree(outputbytes);
     847             :         }
     848             :         else
     849             :         {
     850             :             char       *outputstr;
     851             : 
     852      397676 :             pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
     853      397676 :             outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
     854      397676 :             pq_sendcountedtext(out, outputstr, strlen(outputstr));
     855      397676 :             pfree(outputstr);
     856             :         }
     857             : 
     858      627762 :         ReleaseSysCache(typtup);
     859             :     }
     860      364426 : }
     861             : 
     862             : /*
     863             :  * Read tuple in logical replication format from stream.
     864             :  */
     865             : static void
     866      297242 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
     867             : {
     868             :     int         i;
     869             :     int         natts;
     870             : 
     871             :     /* Get number of attributes */
     872      297242 :     natts = pq_getmsgint(in, 2);
     873             : 
     874             :     /* Allocate space for per-column values; zero out unused StringInfoDatas */
     875      297242 :     tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
     876      297242 :     tuple->colstatus = (char *) palloc(natts * sizeof(char));
     877      297242 :     tuple->ncols = natts;
     878             : 
     879             :     /* Read the data */
     880      903512 :     for (i = 0; i < natts; i++)
     881             :     {
     882             :         char       *buff;
     883             :         char        kind;
     884             :         int         len;
     885      606270 :         StringInfo  value = &tuple->colvalues[i];
     886             : 
     887      606270 :         kind = pq_getmsgbyte(in);
     888      606270 :         tuple->colstatus[i] = kind;
     889             : 
     890      606270 :         switch (kind)
     891             :         {
     892      100712 :             case LOGICALREP_COLUMN_NULL:
     893             :                 /* nothing more to do */
     894      100712 :                 break;
     895           6 :             case LOGICALREP_COLUMN_UNCHANGED:
     896             :                 /* we don't receive the value of an unchanged column */
     897           6 :                 break;
     898      505552 :             case LOGICALREP_COLUMN_TEXT:
     899             :             case LOGICALREP_COLUMN_BINARY:
     900      505552 :                 len = pq_getmsgint(in, 4);  /* read length */
     901             : 
     902             :                 /* and data */
     903      505552 :                 buff = palloc(len + 1);
     904      505552 :                 pq_copymsgbytes(in, buff, len);
     905             : 
     906             :                 /*
     907             :                  * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
     908             :                  * as input functions require that.  For
     909             :                  * LOGICALREP_COLUMN_BINARY it's not technically required, but
     910             :                  * it's harmless.
     911             :                  */
     912      505552 :                 buff[len] = '\0';
     913             : 
     914      505552 :                 initStringInfoFromString(value, buff, len);
     915      505552 :                 break;
     916           0 :             default:
     917           0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
     918             :         }
     919             :     }
     920      297242 : }
     921             : 
     922             : /*
     923             :  * Write relation attribute metadata to the stream.
     924             :  */
     925             : static void
     926         618 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
     927             : {
     928             :     TupleDesc   desc;
     929             :     int         i;
     930         618 :     uint16      nliveatts = 0;
     931         618 :     Bitmapset  *idattrs = NULL;
     932             :     bool        replidentfull;
     933             : 
     934         618 :     desc = RelationGetDescr(rel);
     935             : 
     936             :     /* send number of live attributes */
     937        1844 :     for (i = 0; i < desc->natts; i++)
     938             :     {
     939        1226 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     940             : 
     941        1226 :         if (att->attisdropped || att->attgenerated)
     942          10 :             continue;
     943             : 
     944        1216 :         if (!column_in_column_list(att->attnum, columns))
     945         106 :             continue;
     946             : 
     947        1110 :         nliveatts++;
     948             :     }
     949         618 :     pq_sendint16(out, nliveatts);
     950             : 
     951             :     /* fetch bitmap of REPLICATION IDENTITY attributes */
     952         618 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
     953         618 :     if (!replidentfull)
     954         540 :         idattrs = RelationGetIdentityKeyBitmap(rel);
     955             : 
     956             :     /* send the attributes */
     957        1844 :     for (i = 0; i < desc->natts; i++)
     958             :     {
     959        1226 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     960        1226 :         uint8       flags = 0;
     961             : 
     962        1226 :         if (att->attisdropped || att->attgenerated)
     963          10 :             continue;
     964             : 
     965        1216 :         if (!column_in_column_list(att->attnum, columns))
     966         106 :             continue;
     967             : 
     968             :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
     969        2112 :         if (replidentfull ||
     970        1002 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     971             :                           idattrs))
     972         526 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
     973             : 
     974        1110 :         pq_sendbyte(out, flags);
     975             : 
     976             :         /* attribute name */
     977        1110 :         pq_sendstring(out, NameStr(att->attname));
     978             : 
     979             :         /* attribute type id */
     980        1110 :         pq_sendint32(out, (int) att->atttypid);
     981             : 
     982             :         /* attribute mode */
     983        1110 :         pq_sendint32(out, att->atttypmod);
     984             :     }
     985             : 
     986         618 :     bms_free(idattrs);
     987         618 : }
     988             : 
     989             : /*
     990             :  * Read relation attribute metadata from the stream.
     991             :  */
     992             : static void
     993         756 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
     994             : {
     995             :     int         i;
     996             :     int         natts;
     997             :     char      **attnames;
     998             :     Oid        *atttyps;
     999         756 :     Bitmapset  *attkeys = NULL;
    1000             : 
    1001         756 :     natts = pq_getmsgint(in, 2);
    1002         756 :     attnames = palloc(natts * sizeof(char *));
    1003         756 :     atttyps = palloc(natts * sizeof(Oid));
    1004             : 
    1005             :     /* read the attributes */
    1006        2110 :     for (i = 0; i < natts; i++)
    1007             :     {
    1008             :         uint8       flags;
    1009             : 
    1010             :         /* Check for replica identity column */
    1011        1354 :         flags = pq_getmsgbyte(in);
    1012        1354 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
    1013         634 :             attkeys = bms_add_member(attkeys, i);
    1014             : 
    1015             :         /* attribute name */
    1016        1354 :         attnames[i] = pstrdup(pq_getmsgstring(in));
    1017             : 
    1018             :         /* attribute type id */
    1019        1354 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
    1020             : 
    1021             :         /* we ignore attribute mode for now */
    1022        1354 :         (void) pq_getmsgint(in, 4);
    1023             :     }
    1024             : 
    1025         756 :     rel->attnames = attnames;
    1026         756 :     rel->atttyps = atttyps;
    1027         756 :     rel->attkeys = attkeys;
    1028         756 :     rel->natts = natts;
    1029         756 : }
    1030             : 
    1031             : /*
    1032             :  * Write the namespace name or empty string for pg_catalog (to save space).
    1033             :  */
    1034             : static void
    1035         654 : logicalrep_write_namespace(StringInfo out, Oid nspid)
    1036             : {
    1037         654 :     if (nspid == PG_CATALOG_NAMESPACE)
    1038           2 :         pq_sendbyte(out, '\0');
    1039             :     else
    1040             :     {
    1041         652 :         char       *nspname = get_namespace_name(nspid);
    1042             : 
    1043         652 :         if (nspname == NULL)
    1044           0 :             elog(ERROR, "cache lookup failed for namespace %u",
    1045             :                  nspid);
    1046             : 
    1047         652 :         pq_sendstring(out, nspname);
    1048             :     }
    1049         654 : }
    1050             : 
    1051             : /*
    1052             :  * Read the namespace name while treating empty string as pg_catalog.
    1053             :  */
    1054             : static const char *
    1055         792 : logicalrep_read_namespace(StringInfo in)
    1056             : {
    1057         792 :     const char *nspname = pq_getmsgstring(in);
    1058             : 
    1059         792 :     if (nspname[0] == '\0')
    1060           2 :         nspname = "pg_catalog";
    1061             : 
    1062         792 :     return nspname;
    1063             : }
    1064             : 
    1065             : /*
    1066             :  * Write the information for the start stream message to the output stream.
    1067             :  */
    1068             : void
    1069        1274 : logicalrep_write_stream_start(StringInfo out,
    1070             :                               TransactionId xid, bool first_segment)
    1071             : {
    1072        1274 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
    1073             : 
    1074             :     Assert(TransactionIdIsValid(xid));
    1075             : 
    1076             :     /* transaction ID (we're starting to stream, so must be valid) */
    1077        1274 :     pq_sendint32(out, xid);
    1078             : 
    1079             :     /* 1 if this is the first streaming segment for this xid */
    1080        1274 :     pq_sendbyte(out, first_segment ? 1 : 0);
    1081        1274 : }
    1082             : 
    1083             : /*
    1084             :  * Read the information about the start stream message from output stream.
    1085             :  */
    1086             : TransactionId
    1087        1676 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
    1088             : {
    1089             :     TransactionId xid;
    1090             : 
    1091             :     Assert(first_segment);
    1092             : 
    1093        1676 :     xid = pq_getmsgint(in, 4);
    1094        1676 :     *first_segment = (pq_getmsgbyte(in) == 1);
    1095             : 
    1096        1676 :     return xid;
    1097             : }
    1098             : 
    1099             : /*
    1100             :  * Write the stop stream message to the output stream.
    1101             :  */
    1102             : void
    1103        1274 : logicalrep_write_stream_stop(StringInfo out)
    1104             : {
    1105        1274 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
    1106        1274 : }
    1107             : 
    1108             : /*
    1109             :  * Write STREAM COMMIT to the output stream.
    1110             :  */
    1111             : void
    1112          92 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
    1113             :                                XLogRecPtr commit_lsn)
    1114             : {
    1115          92 :     uint8       flags = 0;
    1116             : 
    1117          92 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
    1118             : 
    1119             :     Assert(TransactionIdIsValid(txn->xid));
    1120             : 
    1121             :     /* transaction ID */
    1122          92 :     pq_sendint32(out, txn->xid);
    1123             : 
    1124             :     /* send the flags field (unused for now) */
    1125          92 :     pq_sendbyte(out, flags);
    1126             : 
    1127             :     /* send fields */
    1128          92 :     pq_sendint64(out, commit_lsn);
    1129          92 :     pq_sendint64(out, txn->end_lsn);
    1130          92 :     pq_sendint64(out, txn->xact_time.commit_time);
    1131          92 : }
    1132             : 
    1133             : /*
    1134             :  * Read STREAM COMMIT from the output stream.
    1135             :  */
    1136             : TransactionId
    1137         122 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
    1138             : {
    1139             :     TransactionId xid;
    1140             :     uint8       flags;
    1141             : 
    1142         122 :     xid = pq_getmsgint(in, 4);
    1143             : 
    1144             :     /* read flags (unused for now) */
    1145         122 :     flags = pq_getmsgbyte(in);
    1146             : 
    1147         122 :     if (flags != 0)
    1148           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
    1149             : 
    1150             :     /* read fields */
    1151         122 :     commit_data->commit_lsn = pq_getmsgint64(in);
    1152         122 :     commit_data->end_lsn = pq_getmsgint64(in);
    1153         122 :     commit_data->committime = pq_getmsgint64(in);
    1154             : 
    1155         122 :     return xid;
    1156             : }
    1157             : 
    1158             : /*
    1159             :  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
    1160             :  * same for the top-level transaction abort.
    1161             :  *
    1162             :  * If write_abort_info is true, send the abort_lsn and abort_time fields,
    1163             :  * otherwise don't.
    1164             :  */
    1165             : void
    1166          52 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
    1167             :                               TransactionId subxid, XLogRecPtr abort_lsn,
    1168             :                               TimestampTz abort_time, bool write_abort_info)
    1169             : {
    1170          52 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
    1171             : 
    1172             :     Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
    1173             : 
    1174             :     /* transaction ID */
    1175          52 :     pq_sendint32(out, xid);
    1176          52 :     pq_sendint32(out, subxid);
    1177             : 
    1178          52 :     if (write_abort_info)
    1179             :     {
    1180          24 :         pq_sendint64(out, abort_lsn);
    1181          24 :         pq_sendint64(out, abort_time);
    1182             :     }
    1183          52 : }
    1184             : 
    1185             : /*
    1186             :  * Read STREAM ABORT from the output stream.
    1187             :  *
    1188             :  * If read_abort_info is true, read the abort_lsn and abort_time fields,
    1189             :  * otherwise don't.
    1190             :  */
    1191             : void
    1192          76 : logicalrep_read_stream_abort(StringInfo in,
    1193             :                              LogicalRepStreamAbortData *abort_data,
    1194             :                              bool read_abort_info)
    1195             : {
    1196             :     Assert(abort_data);
    1197             : 
    1198          76 :     abort_data->xid = pq_getmsgint(in, 4);
    1199          76 :     abort_data->subxid = pq_getmsgint(in, 4);
    1200             : 
    1201          76 :     if (read_abort_info)
    1202             :     {
    1203          48 :         abort_data->abort_lsn = pq_getmsgint64(in);
    1204          48 :         abort_data->abort_time = pq_getmsgint64(in);
    1205             :     }
    1206             :     else
    1207             :     {
    1208          28 :         abort_data->abort_lsn = InvalidXLogRecPtr;
    1209          28 :         abort_data->abort_time = 0;
    1210             :     }
    1211          76 : }
    1212             : 
    1213             : /*
    1214             :  * Get string representing LogicalRepMsgType.
    1215             :  */
    1216             : const char *
    1217         678 : logicalrep_message_type(LogicalRepMsgType action)
    1218             : {
    1219             :     static char err_unknown[20];
    1220             : 
    1221         678 :     switch (action)
    1222             :     {
    1223           2 :         case LOGICAL_REP_MSG_BEGIN:
    1224           2 :             return "BEGIN";
    1225           2 :         case LOGICAL_REP_MSG_COMMIT:
    1226           2 :             return "COMMIT";
    1227           0 :         case LOGICAL_REP_MSG_ORIGIN:
    1228           0 :             return "ORIGIN";
    1229          74 :         case LOGICAL_REP_MSG_INSERT:
    1230          74 :             return "INSERT";
    1231          18 :         case LOGICAL_REP_MSG_UPDATE:
    1232          18 :             return "UPDATE";
    1233          10 :         case LOGICAL_REP_MSG_DELETE:
    1234          10 :             return "DELETE";
    1235           0 :         case LOGICAL_REP_MSG_TRUNCATE:
    1236           0 :             return "TRUNCATE";
    1237           4 :         case LOGICAL_REP_MSG_RELATION:
    1238           4 :             return "RELATION";
    1239           0 :         case LOGICAL_REP_MSG_TYPE:
    1240           0 :             return "TYPE";
    1241           0 :         case LOGICAL_REP_MSG_MESSAGE:
    1242           0 :             return "MESSAGE";
    1243           2 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    1244           2 :             return "BEGIN PREPARE";
    1245           2 :         case LOGICAL_REP_MSG_PREPARE:
    1246           2 :             return "PREPARE";
    1247           0 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    1248           0 :             return "COMMIT PREPARED";
    1249           0 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    1250           0 :             return "ROLLBACK PREPARED";
    1251          26 :         case LOGICAL_REP_MSG_STREAM_START:
    1252          26 :             return "STREAM START";
    1253         456 :         case LOGICAL_REP_MSG_STREAM_STOP:
    1254         456 :             return "STREAM STOP";
    1255          40 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    1256          40 :             return "STREAM COMMIT";
    1257          38 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    1258          38 :             return "STREAM ABORT";
    1259           4 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    1260           4 :             return "STREAM PREPARE";
    1261             :     }
    1262             : 
    1263             :     /*
    1264             :      * This message provides context in the error raised when applying a
    1265             :      * logical message. So we can't throw an error here. Return an unknown
    1266             :      * indicator value so that the original error is still reported.
    1267             :      */
    1268           0 :     snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
    1269             : 
    1270           0 :     return err_unknown;
    1271             : }

Generated by: LCOV version 1.14