LCOV - code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 424 487 87.1 %
Date: 2021-12-09 03:08:47 Functions: 44 45 97.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * proto.c
       4             :  *      logical replication protocol functions
       5             :  *
       6             :  * Copyright (c) 2015-2021, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *      src/backend/replication/logical/proto.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/sysattr.h"
      16             : #include "catalog/pg_namespace.h"
      17             : #include "catalog/pg_type.h"
      18             : #include "libpq/pqformat.h"
      19             : #include "replication/logicalproto.h"
      20             : #include "utils/lsyscache.h"
      21             : #include "utils/syscache.h"
      22             : 
      23             : /*
      24             :  * Protocol message flags.
      25             :  */
      26             : #define LOGICALREP_IS_REPLICA_IDENTITY 1
      27             : 
      28             : #define MESSAGE_TRANSACTIONAL (1<<0)
      29             : #define TRUNCATE_CASCADE        (1<<0)
      30             : #define TRUNCATE_RESTART_SEQS   (1<<1)
      31             : 
      32             : static void logicalrep_write_attrs(StringInfo out, Relation rel);
      33             : static void logicalrep_write_tuple(StringInfo out, Relation rel,
      34             :                                    HeapTuple tuple, bool binary);
      35             : 
      36             : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
      37             : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
      38             : 
      39             : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
      40             : static const char *logicalrep_read_namespace(StringInfo in);
      41             : 
      42             : /*
      43             :  * Write BEGIN to the output stream.
      44             :  */
      45             : void
      46         462 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
      47             : {
      48         462 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
      49             : 
      50             :     /* fixed fields */
      51         462 :     pq_sendint64(out, txn->final_lsn);
      52         462 :     pq_sendint64(out, txn->xact_time.commit_time);
      53         462 :     pq_sendint32(out, txn->xid);
      54         462 : }
      55             : 
      56             : /*
      57             :  * Read transaction BEGIN from the stream.
      58             :  */
      59             : void
      60         664 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
      61             : {
      62             :     /* read fields */
      63         664 :     begin_data->final_lsn = pq_getmsgint64(in);
      64         664 :     if (begin_data->final_lsn == InvalidXLogRecPtr)
      65           0 :         elog(ERROR, "final_lsn not set in begin message");
      66         664 :     begin_data->committime = pq_getmsgint64(in);
      67         664 :     begin_data->xid = pq_getmsgint(in, 4);
      68         664 : }
      69             : 
      70             : 
      71             : /*
      72             :  * Write COMMIT to the output stream.
      73             :  */
      74             : void
      75         462 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
      76             :                         XLogRecPtr commit_lsn)
      77             : {
      78         462 :     uint8       flags = 0;
      79             : 
      80         462 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
      81             : 
      82             :     /* send the flags field (unused for now) */
      83         462 :     pq_sendbyte(out, flags);
      84             : 
      85             :     /* send fields */
      86         462 :     pq_sendint64(out, commit_lsn);
      87         462 :     pq_sendint64(out, txn->end_lsn);
      88         462 :     pq_sendint64(out, txn->xact_time.commit_time);
      89         462 : }
      90             : 
      91             : /*
      92             :  * Read transaction COMMIT from the stream.
      93             :  */
      94             : void
      95         652 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
      96             : {
      97             :     /* read flags (unused for now) */
      98         652 :     uint8       flags = pq_getmsgbyte(in);
      99             : 
     100         652 :     if (flags != 0)
     101           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
     102             : 
     103             :     /* read fields */
     104         652 :     commit_data->commit_lsn = pq_getmsgint64(in);
     105         652 :     commit_data->end_lsn = pq_getmsgint64(in);
     106         652 :     commit_data->committime = pq_getmsgint64(in);
     107         652 : }
     108             : 
     109             : /*
     110             :  * Write BEGIN PREPARE to the output stream.
     111             :  */
     112             : void
     113          32 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
     114             : {
     115          32 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
     116             : 
     117             :     /* fixed fields */
     118          32 :     pq_sendint64(out, txn->final_lsn);
     119          32 :     pq_sendint64(out, txn->end_lsn);
     120          32 :     pq_sendint64(out, txn->xact_time.prepare_time);
     121          32 :     pq_sendint32(out, txn->xid);
     122             : 
     123             :     /* send gid */
     124          32 :     pq_sendstring(out, txn->gid);
     125          32 : }
     126             : 
     127             : /*
     128             :  * Read transaction BEGIN PREPARE from the stream.
     129             :  */
     130             : void
     131          24 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
     132             : {
     133             :     /* read fields */
     134          24 :     begin_data->prepare_lsn = pq_getmsgint64(in);
     135          24 :     if (begin_data->prepare_lsn == InvalidXLogRecPtr)
     136           0 :         elog(ERROR, "prepare_lsn not set in begin prepare message");
     137          24 :     begin_data->end_lsn = pq_getmsgint64(in);
     138          24 :     if (begin_data->end_lsn == InvalidXLogRecPtr)
     139           0 :         elog(ERROR, "end_lsn not set in begin prepare message");
     140          24 :     begin_data->prepare_time = pq_getmsgint64(in);
     141          24 :     begin_data->xid = pq_getmsgint(in, 4);
     142             : 
     143             :     /* read gid (copy it into a pre-allocated buffer) */
     144          24 :     strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
     145          24 : }
     146             : 
     147             : /*
     148             :  * The core functionality for logicalrep_write_prepare and
     149             :  * logicalrep_write_stream_prepare.
     150             :  */
     151             : static void
     152          48 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
     153             :                                 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
     154             : {
     155          48 :     uint8       flags = 0;
     156             : 
     157          48 :     pq_sendbyte(out, type);
     158             : 
     159             :     /*
     160             :      * This should only ever happen for two-phase commit transactions, in
     161             :      * which case we expect to have a valid GID.
     162             :      */
     163             :     Assert(txn->gid != NULL);
     164             :     Assert(rbtxn_prepared(txn));
     165             :     Assert(TransactionIdIsValid(txn->xid));
     166             : 
     167             :     /* send the flags field */
     168          48 :     pq_sendbyte(out, flags);
     169             : 
     170             :     /* send fields */
     171          48 :     pq_sendint64(out, prepare_lsn);
     172          48 :     pq_sendint64(out, txn->end_lsn);
     173          48 :     pq_sendint64(out, txn->xact_time.prepare_time);
     174          48 :     pq_sendint32(out, txn->xid);
     175             : 
     176             :     /* send gid */
     177          48 :     pq_sendstring(out, txn->gid);
     178          48 : }
     179             : 
     180             : /*
     181             :  * Write PREPARE to the output stream.
     182             :  */
     183             : void
     184          32 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
     185             :                          XLogRecPtr prepare_lsn)
     186             : {
     187          32 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
     188             :                                     txn, prepare_lsn);
     189          32 : }
     190             : 
     191             : /*
     192             :  * The core functionality for logicalrep_read_prepare and
     193             :  * logicalrep_read_stream_prepare.
     194             :  */
     195             : static void
     196          36 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
     197             :                                LogicalRepPreparedTxnData *prepare_data)
     198             : {
     199             :     /* read flags */
     200          36 :     uint8       flags = pq_getmsgbyte(in);
     201             : 
     202          36 :     if (flags != 0)
     203           0 :         elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
     204             : 
     205             :     /* read fields */
     206          36 :     prepare_data->prepare_lsn = pq_getmsgint64(in);
     207          36 :     if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
     208           0 :         elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
     209          36 :     prepare_data->end_lsn = pq_getmsgint64(in);
     210          36 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
     211           0 :         elog(ERROR, "end_lsn is not set in %s message", msgtype);
     212          36 :     prepare_data->prepare_time = pq_getmsgint64(in);
     213          36 :     prepare_data->xid = pq_getmsgint(in, 4);
     214          36 :     if (prepare_data->xid == InvalidTransactionId)
     215           0 :         elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
     216             : 
     217             :     /* read gid (copy it into a pre-allocated buffer) */
     218          36 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     219          36 : }
     220             : 
     221             : /*
     222             :  * Read transaction PREPARE from the stream.
     223             :  */
     224             : void
     225          24 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     226             : {
     227          24 :     logicalrep_read_prepare_common(in, "prepare", prepare_data);
     228          24 : }
     229             : 
     230             : /*
     231             :  * Write COMMIT PREPARED to the output stream.
     232             :  */
     233             : void
     234          34 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
     235             :                                  XLogRecPtr commit_lsn)
     236             : {
     237          34 :     uint8       flags = 0;
     238             : 
     239          34 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
     240             : 
     241             :     /*
     242             :      * This should only ever happen for two-phase commit transactions, in
     243             :      * which case we expect to have a valid GID.
     244             :      */
     245             :     Assert(txn->gid != NULL);
     246             : 
     247             :     /* send the flags field */
     248          34 :     pq_sendbyte(out, flags);
     249             : 
     250             :     /* send fields */
     251          34 :     pq_sendint64(out, commit_lsn);
     252          34 :     pq_sendint64(out, txn->end_lsn);
     253          34 :     pq_sendint64(out, txn->xact_time.commit_time);
     254          34 :     pq_sendint32(out, txn->xid);
     255             : 
     256             :     /* send gid */
     257          34 :     pq_sendstring(out, txn->gid);
     258          34 : }
     259             : 
     260             : /*
     261             :  * Read transaction COMMIT PREPARED from the stream.
     262             :  */
     263             : void
     264          30 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
     265             : {
     266             :     /* read flags */
     267          30 :     uint8       flags = pq_getmsgbyte(in);
     268             : 
     269          30 :     if (flags != 0)
     270           0 :         elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
     271             : 
     272             :     /* read fields */
     273          30 :     prepare_data->commit_lsn = pq_getmsgint64(in);
     274          30 :     if (prepare_data->commit_lsn == InvalidXLogRecPtr)
     275           0 :         elog(ERROR, "commit_lsn is not set in commit prepared message");
     276          30 :     prepare_data->end_lsn = pq_getmsgint64(in);
     277          30 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
     278           0 :         elog(ERROR, "end_lsn is not set in commit prepared message");
     279          30 :     prepare_data->commit_time = pq_getmsgint64(in);
     280          30 :     prepare_data->xid = pq_getmsgint(in, 4);
     281             : 
     282             :     /* read gid (copy it into a pre-allocated buffer) */
     283          30 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     284          30 : }
     285             : 
     286             : /*
     287             :  * Write ROLLBACK PREPARED to the output stream.
     288             :  */
     289             : void
     290          14 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
     291             :                                    XLogRecPtr prepare_end_lsn,
     292             :                                    TimestampTz prepare_time)
     293             : {
     294          14 :     uint8       flags = 0;
     295             : 
     296          14 :     pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
     297             : 
     298             :     /*
     299             :      * This should only ever happen for two-phase commit transactions, in
     300             :      * which case we expect to have a valid GID.
     301             :      */
     302             :     Assert(txn->gid != NULL);
     303             : 
     304             :     /* send the flags field */
     305          14 :     pq_sendbyte(out, flags);
     306             : 
     307             :     /* send fields */
     308          14 :     pq_sendint64(out, prepare_end_lsn);
     309          14 :     pq_sendint64(out, txn->end_lsn);
     310          14 :     pq_sendint64(out, prepare_time);
     311          14 :     pq_sendint64(out, txn->xact_time.commit_time);
     312          14 :     pq_sendint32(out, txn->xid);
     313             : 
     314             :     /* send gid */
     315          14 :     pq_sendstring(out, txn->gid);
     316          14 : }
     317             : 
     318             : /*
     319             :  * Read transaction ROLLBACK PREPARED from the stream.
     320             :  */
     321             : void
     322           8 : logicalrep_read_rollback_prepared(StringInfo in,
     323             :                                   LogicalRepRollbackPreparedTxnData *rollback_data)
     324             : {
     325             :     /* read flags */
     326           8 :     uint8       flags = pq_getmsgbyte(in);
     327             : 
     328           8 :     if (flags != 0)
     329           0 :         elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
     330             : 
     331             :     /* read fields */
     332           8 :     rollback_data->prepare_end_lsn = pq_getmsgint64(in);
     333           8 :     if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
     334           0 :         elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
     335           8 :     rollback_data->rollback_end_lsn = pq_getmsgint64(in);
     336           8 :     if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
     337           0 :         elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
     338           8 :     rollback_data->prepare_time = pq_getmsgint64(in);
     339           8 :     rollback_data->rollback_time = pq_getmsgint64(in);
     340           8 :     rollback_data->xid = pq_getmsgint(in, 4);
     341             : 
     342             :     /* read gid (copy it into a pre-allocated buffer) */
     343           8 :     strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
     344           8 : }
     345             : 
     346             : /*
     347             :  * Write STREAM PREPARE to the output stream.
     348             :  */
     349             : void
     350          16 : logicalrep_write_stream_prepare(StringInfo out,
     351             :                                 ReorderBufferTXN *txn,
     352             :                                 XLogRecPtr prepare_lsn)
     353             : {
     354          16 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
     355             :                                     txn, prepare_lsn);
     356          16 : }
     357             : 
     358             : /*
     359             :  * Read STREAM PREPARE from the stream.
     360             :  */
     361             : void
     362          12 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     363             : {
     364          12 :     logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
     365          12 : }
     366             : 
     367             : /*
     368             :  * Write ORIGIN to the output stream.
     369             :  */
     370             : void
     371          22 : logicalrep_write_origin(StringInfo out, const char *origin,
     372             :                         XLogRecPtr origin_lsn)
     373             : {
     374          22 :     pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
     375             : 
     376             :     /* fixed fields */
     377          22 :     pq_sendint64(out, origin_lsn);
     378             : 
     379             :     /* origin string */
     380          22 :     pq_sendstring(out, origin);
     381          22 : }
     382             : 
     383             : /*
     384             :  * Read ORIGIN from the output stream.
     385             :  */
     386             : char *
     387           0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
     388             : {
     389             :     /* fixed fields */
     390           0 :     *origin_lsn = pq_getmsgint64(in);
     391             : 
     392             :     /* return origin */
     393           0 :     return pstrdup(pq_getmsgstring(in));
     394             : }
     395             : 
     396             : /*
     397             :  * Write INSERT to the output stream.
     398             :  */
     399             : void
     400      152956 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
     401             :                         HeapTuple newtuple, bool binary)
     402             : {
     403      152956 :     pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
     404             : 
     405             :     /* transaction ID (if not valid, we're not streaming) */
     406      152956 :     if (TransactionIdIsValid(xid))
     407      151964 :         pq_sendint32(out, xid);
     408             : 
     409             :     /* use Oid as relation identifier */
     410      152956 :     pq_sendint32(out, RelationGetRelid(rel));
     411             : 
     412      152956 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     413      152956 :     logicalrep_write_tuple(out, rel, newtuple, binary);
     414      152956 : }
     415             : 
     416             : /*
     417             :  * Read INSERT from stream.
     418             :  *
     419             :  * Fills the new tuple.
     420             :  */
     421             : LogicalRepRelId
     422      103024 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
     423             : {
     424             :     char        action;
     425             :     LogicalRepRelId relid;
     426             : 
     427             :     /* read the relation id */
     428      103024 :     relid = pq_getmsgint(in, 4);
     429             : 
     430      103024 :     action = pq_getmsgbyte(in);
     431      103024 :     if (action != 'N')
     432           0 :         elog(ERROR, "expected new tuple but got %d",
     433             :              action);
     434             : 
     435      103024 :     logicalrep_read_tuple(in, newtup);
     436             : 
     437      103024 :     return relid;
     438             : }
     439             : 
     440             : /*
     441             :  * Write UPDATE to the output stream.
     442             :  */
     443             : void
     444       77758 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
     445             :                         HeapTuple oldtuple, HeapTuple newtuple, bool binary)
     446             : {
     447       77758 :     pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
     448             : 
     449             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     450             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     451             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     452             : 
     453             :     /* transaction ID (if not valid, we're not streaming) */
     454       77758 :     if (TransactionIdIsValid(xid))
     455       77504 :         pq_sendint32(out, xid);
     456             : 
     457             :     /* use Oid as relation identifier */
     458       77758 :     pq_sendint32(out, RelationGetRelid(rel));
     459             : 
     460       77758 :     if (oldtuple != NULL)
     461             :     {
     462         138 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     463          48 :             pq_sendbyte(out, 'O');  /* old tuple follows */
     464             :         else
     465          90 :             pq_sendbyte(out, 'K');  /* old key follows */
     466         138 :         logicalrep_write_tuple(out, rel, oldtuple, binary);
     467             :     }
     468             : 
     469       77758 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     470       77758 :     logicalrep_write_tuple(out, rel, newtuple, binary);
     471       77758 : }
     472             : 
     473             : /*
     474             :  * Read UPDATE from stream.
     475             :  */
     476             : LogicalRepRelId
     477       57788 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
     478             :                        LogicalRepTupleData *oldtup,
     479             :                        LogicalRepTupleData *newtup)
     480             : {
     481             :     char        action;
     482             :     LogicalRepRelId relid;
     483             : 
     484             :     /* read the relation id */
     485       57788 :     relid = pq_getmsgint(in, 4);
     486             : 
     487             :     /* read and verify action */
     488       57788 :     action = pq_getmsgbyte(in);
     489       57788 :     if (action != 'K' && action != 'O' && action != 'N')
     490           0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
     491             :              action);
     492             : 
     493             :     /* check for old tuple */
     494       57788 :     if (action == 'K' || action == 'O')
     495             :     {
     496         162 :         logicalrep_read_tuple(in, oldtup);
     497         162 :         *has_oldtuple = true;
     498             : 
     499         162 :         action = pq_getmsgbyte(in);
     500             :     }
     501             :     else
     502       57626 :         *has_oldtuple = false;
     503             : 
     504             :     /* check for new  tuple */
     505       57788 :     if (action != 'N')
     506           0 :         elog(ERROR, "expected action 'N', got %c",
     507             :              action);
     508             : 
     509       57788 :     logicalrep_read_tuple(in, newtup);
     510             : 
     511       57788 :     return relid;
     512             : }
     513             : 
     514             : /*
     515             :  * Write DELETE to the output stream.
     516             :  */
     517             : void
     518       72184 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
     519             :                         HeapTuple oldtuple, bool binary)
     520             : {
     521             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     522             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     523             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     524             : 
     525       72184 :     pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
     526             : 
     527             :     /* transaction ID (if not valid, we're not streaming) */
     528       72184 :     if (TransactionIdIsValid(xid))
     529       71778 :         pq_sendint32(out, xid);
     530             : 
     531             :     /* use Oid as relation identifier */
     532       72184 :     pq_sendint32(out, RelationGetRelid(rel));
     533             : 
     534       72184 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     535         202 :         pq_sendbyte(out, 'O');  /* old tuple follows */
     536             :     else
     537       71982 :         pq_sendbyte(out, 'K');  /* old key follows */
     538             : 
     539       72184 :     logicalrep_write_tuple(out, rel, oldtuple, binary);
     540       72184 : }
     541             : 
     542             : /*
     543             :  * Read DELETE from stream.
     544             :  *
     545             :  * Fills the old tuple.
     546             :  */
     547             : LogicalRepRelId
     548       55456 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
     549             : {
     550             :     char        action;
     551             :     LogicalRepRelId relid;
     552             : 
     553             :     /* read the relation id */
     554       55456 :     relid = pq_getmsgint(in, 4);
     555             : 
     556             :     /* read and verify action */
     557       55456 :     action = pq_getmsgbyte(in);
     558       55456 :     if (action != 'K' && action != 'O')
     559           0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
     560             : 
     561       55456 :     logicalrep_read_tuple(in, oldtup);
     562             : 
     563       55456 :     return relid;
     564             : }
     565             : 
     566             : /*
     567             :  * Write TRUNCATE to the output stream.
     568             :  */
     569             : void
     570           6 : logicalrep_write_truncate(StringInfo out,
     571             :                           TransactionId xid,
     572             :                           int nrelids,
     573             :                           Oid relids[],
     574             :                           bool cascade, bool restart_seqs)
     575             : {
     576             :     int         i;
     577           6 :     uint8       flags = 0;
     578             : 
     579           6 :     pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
     580             : 
     581             :     /* transaction ID (if not valid, we're not streaming) */
     582           6 :     if (TransactionIdIsValid(xid))
     583           0 :         pq_sendint32(out, xid);
     584             : 
     585           6 :     pq_sendint32(out, nrelids);
     586             : 
     587             :     /* encode and send truncate flags */
     588           6 :     if (cascade)
     589           0 :         flags |= TRUNCATE_CASCADE;
     590           6 :     if (restart_seqs)
     591           0 :         flags |= TRUNCATE_RESTART_SEQS;
     592           6 :     pq_sendint8(out, flags);
     593             : 
     594          16 :     for (i = 0; i < nrelids; i++)
     595          10 :         pq_sendint32(out, relids[i]);
     596           6 : }
     597             : 
     598             : /*
     599             :  * Read TRUNCATE from stream.
     600             :  */
     601             : List *
     602          30 : logicalrep_read_truncate(StringInfo in,
     603             :                          bool *cascade, bool *restart_seqs)
     604             : {
     605             :     int         i;
     606             :     int         nrelids;
     607          30 :     List       *relids = NIL;
     608             :     uint8       flags;
     609             : 
     610          30 :     nrelids = pq_getmsgint(in, 4);
     611             : 
     612             :     /* read and decode truncate flags */
     613          30 :     flags = pq_getmsgint(in, 1);
     614          30 :     *cascade = (flags & TRUNCATE_CASCADE) > 0;
     615          30 :     *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
     616             : 
     617          76 :     for (i = 0; i < nrelids; i++)
     618          46 :         relids = lappend_oid(relids, pq_getmsgint(in, 4));
     619             : 
     620          30 :     return relids;
     621             : }
     622             : 
     623             : /*
     624             :  * Write MESSAGE to stream
     625             :  */
     626             : void
     627          10 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
     628             :                          bool transactional, const char *prefix, Size sz,
     629             :                          const char *message)
     630             : {
     631          10 :     uint8       flags = 0;
     632             : 
     633          10 :     pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
     634             : 
     635             :     /* encode and send message flags */
     636          10 :     if (transactional)
     637           4 :         flags |= MESSAGE_TRANSACTIONAL;
     638             : 
     639             :     /* transaction ID (if not valid, we're not streaming) */
     640          10 :     if (TransactionIdIsValid(xid))
     641           0 :         pq_sendint32(out, xid);
     642             : 
     643          10 :     pq_sendint8(out, flags);
     644          10 :     pq_sendint64(out, lsn);
     645          10 :     pq_sendstring(out, prefix);
     646          10 :     pq_sendint32(out, sz);
     647          10 :     pq_sendbytes(out, message, sz);
     648          10 : }
     649             : 
     650             : /*
     651             :  * Write relation description to the output stream.
     652             :  */
     653             : void
     654         238 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
     655             : {
     656             :     char       *relname;
     657             : 
     658         238 :     pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
     659             : 
     660             :     /* transaction ID (if not valid, we're not streaming) */
     661         238 :     if (TransactionIdIsValid(xid))
     662          66 :         pq_sendint32(out, xid);
     663             : 
     664             :     /* use Oid as relation identifier */
     665         238 :     pq_sendint32(out, RelationGetRelid(rel));
     666             : 
     667             :     /* send qualified relation name */
     668         238 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
     669         238 :     relname = RelationGetRelationName(rel);
     670         238 :     pq_sendstring(out, relname);
     671             : 
     672             :     /* send replica identity */
     673         238 :     pq_sendbyte(out, rel->rd_rel->relreplident);
     674             : 
     675             :     /* send the attribute info */
     676         238 :     logicalrep_write_attrs(out, rel);
     677         238 : }
     678             : 
     679             : /*
     680             :  * Read the relation info from stream and return as LogicalRepRelation.
     681             :  */
     682             : LogicalRepRelation *
     683         324 : logicalrep_read_rel(StringInfo in)
     684             : {
     685         324 :     LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
     686             : 
     687         324 :     rel->remoteid = pq_getmsgint(in, 4);
     688             : 
     689             :     /* Read relation name from stream */
     690         324 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
     691         324 :     rel->relname = pstrdup(pq_getmsgstring(in));
     692             : 
     693             :     /* Read the replica identity. */
     694         324 :     rel->replident = pq_getmsgbyte(in);
     695             : 
     696             :     /* Get attribute description */
     697         324 :     logicalrep_read_attrs(in, rel);
     698             : 
     699         324 :     return rel;
     700             : }
     701             : 
     702             : /*
     703             :  * Write type info to the output stream.
     704             :  *
     705             :  * This function will always write base type info.
     706             :  */
     707             : void
     708          32 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
     709             : {
     710          32 :     Oid         basetypoid = getBaseType(typoid);
     711             :     HeapTuple   tup;
     712             :     Form_pg_type typtup;
     713             : 
     714          32 :     pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
     715             : 
     716             :     /* transaction ID (if not valid, we're not streaming) */
     717          32 :     if (TransactionIdIsValid(xid))
     718           0 :         pq_sendint32(out, xid);
     719             : 
     720          32 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
     721          32 :     if (!HeapTupleIsValid(tup))
     722           0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
     723          32 :     typtup = (Form_pg_type) GETSTRUCT(tup);
     724             : 
     725             :     /* use Oid as relation identifier */
     726          32 :     pq_sendint32(out, typoid);
     727             : 
     728             :     /* send qualified type name */
     729          32 :     logicalrep_write_namespace(out, typtup->typnamespace);
     730          32 :     pq_sendstring(out, NameStr(typtup->typname));
     731             : 
     732          32 :     ReleaseSysCache(tup);
     733          32 : }
     734             : 
     735             : /*
     736             :  * Read type info from the output stream.
     737             :  */
     738             : void
     739          32 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
     740             : {
     741          32 :     ltyp->remoteid = pq_getmsgint(in, 4);
     742             : 
     743             :     /* Read type name from stream */
     744          32 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
     745          32 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
     746          32 : }
     747             : 
     748             : /*
     749             :  * Write a tuple to the outputstream, in the most efficient format possible.
     750             :  */
     751             : static void
     752      303036 : logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
     753             : {
     754             :     TupleDesc   desc;
     755             :     Datum       values[MaxTupleAttributeNumber];
     756             :     bool        isnull[MaxTupleAttributeNumber];
     757             :     int         i;
     758      303036 :     uint16      nliveatts = 0;
     759             : 
     760      303036 :     desc = RelationGetDescr(rel);
     761             : 
     762      983180 :     for (i = 0; i < desc->natts; i++)
     763             :     {
     764      680144 :         if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
     765           4 :             continue;
     766      680140 :         nliveatts++;
     767             :     }
     768      303036 :     pq_sendint16(out, nliveatts);
     769             : 
     770             :     /* try to allocate enough memory from the get-go */
     771      303036 :     enlargeStringInfo(out, tuple->t_len +
     772      303036 :                       nliveatts * (1 + 4));
     773             : 
     774      303036 :     heap_deform_tuple(tuple, desc, values, isnull);
     775             : 
     776             :     /* Write the values */
     777      983180 :     for (i = 0; i < desc->natts; i++)
     778             :     {
     779             :         HeapTuple   typtup;
     780             :         Form_pg_type typclass;
     781      680144 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     782             : 
     783      680144 :         if (att->attisdropped || att->attgenerated)
     784           4 :             continue;
     785             : 
     786      680140 :         if (isnull[i])
     787             :         {
     788       92168 :             pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
     789       92168 :             continue;
     790             :         }
     791             : 
     792      587972 :         if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
     793             :         {
     794             :             /*
     795             :              * Unchanged toasted datum.  (Note that we don't promise to detect
     796             :              * unchanged data in general; this is just a cheap check to avoid
     797             :              * sending large values unnecessarily.)
     798             :              */
     799           6 :             pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
     800           6 :             continue;
     801             :         }
     802             : 
     803      587966 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
     804      587966 :         if (!HeapTupleIsValid(typtup))
     805           0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
     806      587966 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
     807             : 
     808             :         /*
     809             :          * Send in binary if requested and type has suitable send function.
     810             :          */
     811      587966 :         if (binary && OidIsValid(typclass->typsend))
     812       66746 :         {
     813             :             bytea      *outputbytes;
     814             :             int         len;
     815             : 
     816       66746 :             pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
     817       66746 :             outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
     818       66746 :             len = VARSIZE(outputbytes) - VARHDRSZ;
     819       66746 :             pq_sendint(out, len, 4);    /* length */
     820       66746 :             pq_sendbytes(out, VARDATA(outputbytes), len);   /* data */
     821       66746 :             pfree(outputbytes);
     822             :         }
     823             :         else
     824             :         {
     825             :             char       *outputstr;
     826             : 
     827      521220 :             pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
     828      521220 :             outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
     829      521220 :             pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
     830      521220 :             pfree(outputstr);
     831             :         }
     832             : 
     833      587966 :         ReleaseSysCache(typtup);
     834             :     }
     835      303036 : }
     836             : 
     837             : /*
     838             :  * Read tuple in logical replication format from stream.
     839             :  */
     840             : static void
     841      216430 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
     842             : {
     843             :     int         i;
     844             :     int         natts;
     845             : 
     846             :     /* Get number of attributes */
     847      216430 :     natts = pq_getmsgint(in, 2);
     848             : 
     849             :     /* Allocate space for per-column values; zero out unused StringInfoDatas */
     850      216430 :     tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
     851      216430 :     tuple->colstatus = (char *) palloc(natts * sizeof(char));
     852      216430 :     tuple->ncols = natts;
     853             : 
     854             :     /* Read the data */
     855      723384 :     for (i = 0; i < natts; i++)
     856             :     {
     857             :         char        kind;
     858             :         int         len;
     859      506954 :         StringInfo  value = &tuple->colvalues[i];
     860             : 
     861      506954 :         kind = pq_getmsgbyte(in);
     862      506954 :         tuple->colstatus[i] = kind;
     863             : 
     864      506954 :         switch (kind)
     865             :         {
     866       75564 :             case LOGICALREP_COLUMN_NULL:
     867             :                 /* nothing more to do */
     868       75564 :                 break;
     869           6 :             case LOGICALREP_COLUMN_UNCHANGED:
     870             :                 /* we don't receive the value of an unchanged column */
     871           6 :                 break;
     872      364646 :             case LOGICALREP_COLUMN_TEXT:
     873      364646 :                 len = pq_getmsgint(in, 4);  /* read length */
     874             : 
     875             :                 /* and data */
     876      364646 :                 value->data = palloc(len + 1);
     877      364646 :                 pq_copymsgbytes(in, value->data, len);
     878      364646 :                 value->data[len] = '\0';
     879             :                 /* make StringInfo fully valid */
     880      364646 :                 value->len = len;
     881      364646 :                 value->cursor = 0;
     882      364646 :                 value->maxlen = len;
     883      364646 :                 break;
     884       66738 :             case LOGICALREP_COLUMN_BINARY:
     885       66738 :                 len = pq_getmsgint(in, 4);  /* read length */
     886             : 
     887             :                 /* and data */
     888       66738 :                 value->data = palloc(len + 1);
     889       66738 :                 pq_copymsgbytes(in, value->data, len);
     890             :                 /* not strictly necessary but per StringInfo practice */
     891       66738 :                 value->data[len] = '\0';
     892             :                 /* make StringInfo fully valid */
     893       66738 :                 value->len = len;
     894       66738 :                 value->cursor = 0;
     895       66738 :                 value->maxlen = len;
     896       66738 :                 break;
     897           0 :             default:
     898           0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
     899             :         }
     900             :     }
     901      216430 : }
     902             : 
     903             : /*
     904             :  * Write relation attribute metadata to the stream.
     905             :  */
     906             : static void
     907         238 : logicalrep_write_attrs(StringInfo out, Relation rel)
     908             : {
     909             :     TupleDesc   desc;
     910             :     int         i;
     911         238 :     uint16      nliveatts = 0;
     912         238 :     Bitmapset  *idattrs = NULL;
     913             :     bool        replidentfull;
     914             : 
     915         238 :     desc = RelationGetDescr(rel);
     916             : 
     917             :     /* send number of live attributes */
     918         734 :     for (i = 0; i < desc->natts; i++)
     919             :     {
     920         496 :         if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
     921           2 :             continue;
     922         494 :         nliveatts++;
     923             :     }
     924         238 :     pq_sendint16(out, nliveatts);
     925             : 
     926             :     /* fetch bitmap of REPLICATION IDENTITY attributes */
     927         238 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
     928         238 :     if (!replidentfull)
     929         226 :         idattrs = RelationGetIdentityKeyBitmap(rel);
     930             : 
     931             :     /* send the attributes */
     932         734 :     for (i = 0; i < desc->natts; i++)
     933             :     {
     934         496 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     935         496 :         uint8       flags = 0;
     936             : 
     937         496 :         if (att->attisdropped || att->attgenerated)
     938           2 :             continue;
     939             : 
     940             :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
     941         974 :         if (replidentfull ||
     942         480 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     943             :                           idattrs))
     944         196 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
     945             : 
     946         494 :         pq_sendbyte(out, flags);
     947             : 
     948             :         /* attribute name */
     949         494 :         pq_sendstring(out, NameStr(att->attname));
     950             : 
     951             :         /* attribute type id */
     952         494 :         pq_sendint32(out, (int) att->atttypid);
     953             : 
     954             :         /* attribute mode */
     955         494 :         pq_sendint32(out, att->atttypmod);
     956             :     }
     957             : 
     958         238 :     bms_free(idattrs);
     959         238 : }
     960             : 
     961             : /*
     962             :  * Read relation attribute metadata from the stream.
     963             :  */
     964             : static void
     965         324 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
     966             : {
     967             :     int         i;
     968             :     int         natts;
     969             :     char      **attnames;
     970             :     Oid        *atttyps;
     971         324 :     Bitmapset  *attkeys = NULL;
     972             : 
     973         324 :     natts = pq_getmsgint(in, 2);
     974         324 :     attnames = palloc(natts * sizeof(char *));
     975         324 :     atttyps = palloc(natts * sizeof(Oid));
     976             : 
     977             :     /* read the attributes */
     978         990 :     for (i = 0; i < natts; i++)
     979             :     {
     980             :         uint8       flags;
     981             : 
     982             :         /* Check for replica identity column */
     983         666 :         flags = pq_getmsgbyte(in);
     984         666 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
     985         274 :             attkeys = bms_add_member(attkeys, i);
     986             : 
     987             :         /* attribute name */
     988         666 :         attnames[i] = pstrdup(pq_getmsgstring(in));
     989             : 
     990             :         /* attribute type id */
     991         666 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
     992             : 
     993             :         /* we ignore attribute mode for now */
     994         666 :         (void) pq_getmsgint(in, 4);
     995             :     }
     996             : 
     997         324 :     rel->attnames = attnames;
     998         324 :     rel->atttyps = atttyps;
     999         324 :     rel->attkeys = attkeys;
    1000         324 :     rel->natts = natts;
    1001         324 : }
    1002             : 
    1003             : /*
    1004             :  * Write the namespace name or empty string for pg_catalog (to save space).
    1005             :  */
    1006             : static void
    1007         270 : logicalrep_write_namespace(StringInfo out, Oid nspid)
    1008             : {
    1009         270 :     if (nspid == PG_CATALOG_NAMESPACE)
    1010           2 :         pq_sendbyte(out, '\0');
    1011             :     else
    1012             :     {
    1013         268 :         char       *nspname = get_namespace_name(nspid);
    1014             : 
    1015         268 :         if (nspname == NULL)
    1016           0 :             elog(ERROR, "cache lookup failed for namespace %u",
    1017             :                  nspid);
    1018             : 
    1019         268 :         pq_sendstring(out, nspname);
    1020             :     }
    1021         270 : }
    1022             : 
    1023             : /*
    1024             :  * Read the namespace name while treating empty string as pg_catalog.
    1025             :  */
    1026             : static const char *
    1027         356 : logicalrep_read_namespace(StringInfo in)
    1028             : {
    1029         356 :     const char *nspname = pq_getmsgstring(in);
    1030             : 
    1031         356 :     if (nspname[0] == '\0')
    1032           2 :         nspname = "pg_catalog";
    1033             : 
    1034         356 :     return nspname;
    1035             : }
    1036             : 
    1037             : /*
    1038             :  * Write the information for the start stream message to the output stream.
    1039             :  */
    1040             : void
    1041         794 : logicalrep_write_stream_start(StringInfo out,
    1042             :                               TransactionId xid, bool first_segment)
    1043             : {
    1044         794 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
    1045             : 
    1046             :     Assert(TransactionIdIsValid(xid));
    1047             : 
    1048             :     /* transaction ID (we're starting to stream, so must be valid) */
    1049         794 :     pq_sendint32(out, xid);
    1050             : 
    1051             :     /* 1 if this is the first streaming segment for this xid */
    1052         794 :     pq_sendbyte(out, first_segment ? 1 : 0);
    1053         794 : }
    1054             : 
    1055             : /*
    1056             :  * Read the information about the start stream message from output stream.
    1057             :  */
    1058             : TransactionId
    1059         646 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
    1060             : {
    1061             :     TransactionId xid;
    1062             : 
    1063             :     Assert(first_segment);
    1064             : 
    1065         646 :     xid = pq_getmsgint(in, 4);
    1066         646 :     *first_segment = (pq_getmsgbyte(in) == 1);
    1067             : 
    1068         646 :     return xid;
    1069             : }
    1070             : 
    1071             : /*
    1072             :  * Write the stop stream message to the output stream.
    1073             :  */
    1074             : void
    1075         790 : logicalrep_write_stream_stop(StringInfo out)
    1076             : {
    1077         790 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
    1078         790 : }
    1079             : 
    1080             : /*
    1081             :  * Write STREAM COMMIT to the output stream.
    1082             :  */
    1083             : void
    1084          28 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
    1085             :                                XLogRecPtr commit_lsn)
    1086             : {
    1087          28 :     uint8       flags = 0;
    1088             : 
    1089          28 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
    1090             : 
    1091             :     Assert(TransactionIdIsValid(txn->xid));
    1092             : 
    1093             :     /* transaction ID */
    1094          28 :     pq_sendint32(out, txn->xid);
    1095             : 
    1096             :     /* send the flags field (unused for now) */
    1097          28 :     pq_sendbyte(out, flags);
    1098             : 
    1099             :     /* send fields */
    1100          28 :     pq_sendint64(out, commit_lsn);
    1101          28 :     pq_sendint64(out, txn->end_lsn);
    1102          28 :     pq_sendint64(out, txn->xact_time.commit_time);
    1103          28 : }
    1104             : 
    1105             : /*
    1106             :  * Read STREAM COMMIT from the output stream.
    1107             :  */
    1108             : TransactionId
    1109          26 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
    1110             : {
    1111             :     TransactionId xid;
    1112             :     uint8       flags;
    1113             : 
    1114          26 :     xid = pq_getmsgint(in, 4);
    1115             : 
    1116             :     /* read flags (unused for now) */
    1117          26 :     flags = pq_getmsgbyte(in);
    1118             : 
    1119          26 :     if (flags != 0)
    1120           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
    1121             : 
    1122             :     /* read fields */
    1123          26 :     commit_data->commit_lsn = pq_getmsgint64(in);
    1124          26 :     commit_data->end_lsn = pq_getmsgint64(in);
    1125          26 :     commit_data->committime = pq_getmsgint64(in);
    1126             : 
    1127          26 :     return xid;
    1128             : }
    1129             : 
    1130             : /*
    1131             :  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
    1132             :  * same for the top-level transaction abort.
    1133             :  */
    1134             : void
    1135          22 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
    1136             :                               TransactionId subxid)
    1137             : {
    1138          22 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
    1139             : 
    1140             :     Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
    1141             : 
    1142             :     /* transaction ID */
    1143          22 :     pq_sendint32(out, xid);
    1144          22 :     pq_sendint32(out, subxid);
    1145          22 : }
    1146             : 
    1147             : /*
    1148             :  * Read STREAM ABORT from the output stream.
    1149             :  */
    1150             : void
    1151          22 : logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
    1152             :                              TransactionId *subxid)
    1153             : {
    1154             :     Assert(xid && subxid);
    1155             : 
    1156          22 :     *xid = pq_getmsgint(in, 4);
    1157          22 :     *subxid = pq_getmsgint(in, 4);
    1158          22 : }
    1159             : 
    1160             : /*
    1161             :  * Get string representing LogicalRepMsgType.
    1162             :  */
    1163             : char *
    1164          34 : logicalrep_message_type(LogicalRepMsgType action)
    1165             : {
    1166          34 :     switch (action)
    1167             :     {
    1168           0 :         case LOGICAL_REP_MSG_BEGIN:
    1169           0 :             return "BEGIN";
    1170           0 :         case LOGICAL_REP_MSG_COMMIT:
    1171           0 :             return "COMMIT";
    1172           0 :         case LOGICAL_REP_MSG_ORIGIN:
    1173           0 :             return "ORIGIN";
    1174          16 :         case LOGICAL_REP_MSG_INSERT:
    1175          16 :             return "INSERT";
    1176           6 :         case LOGICAL_REP_MSG_UPDATE:
    1177           6 :             return "UPDATE";
    1178          10 :         case LOGICAL_REP_MSG_DELETE:
    1179          10 :             return "DELETE";
    1180           0 :         case LOGICAL_REP_MSG_TRUNCATE:
    1181           0 :             return "TRUNCATE";
    1182           0 :         case LOGICAL_REP_MSG_RELATION:
    1183           0 :             return "RELATION";
    1184           0 :         case LOGICAL_REP_MSG_TYPE:
    1185           0 :             return "TYPE";
    1186           0 :         case LOGICAL_REP_MSG_MESSAGE:
    1187           0 :             return "MESSAGE";
    1188           0 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    1189           0 :             return "BEGIN PREPARE";
    1190           0 :         case LOGICAL_REP_MSG_PREPARE:
    1191           0 :             return "PREPARE";
    1192           0 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    1193           0 :             return "COMMIT PREPARED";
    1194           0 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    1195           0 :             return "ROLLBACK PREPARED";
    1196           2 :         case LOGICAL_REP_MSG_STREAM_START:
    1197           2 :             return "STREAM START";
    1198           0 :         case LOGICAL_REP_MSG_STREAM_STOP:
    1199           0 :             return "STREAM STOP";
    1200           0 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    1201           0 :             return "STREAM COMMIT";
    1202           0 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    1203           0 :             return "STREAM ABORT";
    1204           0 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    1205           0 :             return "STREAM PREPARE";
    1206             :     }
    1207             : 
    1208           0 :     elog(ERROR, "invalid logical replication message type \"%c\"", action);
    1209             : 
    1210             :     return NULL;                /* keep compiler quiet */
    1211             : }

Generated by: LCOV version 1.14