LCOV - code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 452 498 90.8 %
Date: 2025-11-16 08:18:22 Functions: 45 46 97.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * proto.c
       4             :  *      logical replication protocol functions
       5             :  *
       6             :  * Copyright (c) 2015-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *      src/backend/replication/logical/proto.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/sysattr.h"
      16             : #include "catalog/pg_namespace.h"
      17             : #include "catalog/pg_type.h"
      18             : #include "libpq/pqformat.h"
      19             : #include "replication/logicalproto.h"
      20             : #include "utils/lsyscache.h"
      21             : #include "utils/syscache.h"
      22             : 
      23             : /*
      24             :  * Protocol message flags.
      25             :  */
      26             : #define LOGICALREP_IS_REPLICA_IDENTITY 1
      27             : 
      28             : #define MESSAGE_TRANSACTIONAL (1<<0)
      29             : #define TRUNCATE_CASCADE        (1<<0)
      30             : #define TRUNCATE_RESTART_SEQS   (1<<1)
      31             : 
      32             : static void logicalrep_write_attrs(StringInfo out, Relation rel,
      33             :                                    Bitmapset *columns,
      34             :                                    PublishGencolsType include_gencols_type);
      35             : static void logicalrep_write_tuple(StringInfo out, Relation rel,
      36             :                                    TupleTableSlot *slot,
      37             :                                    bool binary, Bitmapset *columns,
      38             :                                    PublishGencolsType include_gencols_type);
      39             : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
      40             : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
      41             : 
      42             : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
      43             : static const char *logicalrep_read_namespace(StringInfo in);
      44             : 
      45             : /*
      46             :  * Write BEGIN to the output stream.
      47             :  */
      48             : void
      49         896 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
      50             : {
      51         896 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
      52             : 
      53             :     /* fixed fields */
      54         896 :     pq_sendint64(out, txn->final_lsn);
      55         896 :     pq_sendint64(out, txn->commit_time);
      56         896 :     pq_sendint32(out, txn->xid);
      57         896 : }
      58             : 
      59             : /*
      60             :  * Read transaction BEGIN from the stream.
      61             :  */
      62             : void
      63         960 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
      64             : {
      65             :     /* read fields */
      66         960 :     begin_data->final_lsn = pq_getmsgint64(in);
      67         960 :     if (!XLogRecPtrIsValid(begin_data->final_lsn))
      68           0 :         elog(ERROR, "final_lsn not set in begin message");
      69         960 :     begin_data->committime = pq_getmsgint64(in);
      70         960 :     begin_data->xid = pq_getmsgint(in, 4);
      71         960 : }
      72             : 
      73             : 
      74             : /*
      75             :  * Write COMMIT to the output stream.
      76             :  */
      77             : void
      78         894 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
      79             :                         XLogRecPtr commit_lsn)
      80             : {
      81         894 :     uint8       flags = 0;
      82             : 
      83         894 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
      84             : 
      85             :     /* send the flags field (unused for now) */
      86         894 :     pq_sendbyte(out, flags);
      87             : 
      88             :     /* send fields */
      89         894 :     pq_sendint64(out, commit_lsn);
      90         894 :     pq_sendint64(out, txn->end_lsn);
      91         894 :     pq_sendint64(out, txn->commit_time);
      92         894 : }
      93             : 
      94             : /*
      95             :  * Read transaction COMMIT from the stream.
      96             :  */
      97             : void
      98         876 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
      99             : {
     100             :     /* read flags (unused for now) */
     101         876 :     uint8       flags = pq_getmsgbyte(in);
     102             : 
     103         876 :     if (flags != 0)
     104           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
     105             : 
     106             :     /* read fields */
     107         876 :     commit_data->commit_lsn = pq_getmsgint64(in);
     108         876 :     commit_data->end_lsn = pq_getmsgint64(in);
     109         876 :     commit_data->committime = pq_getmsgint64(in);
     110         876 : }
     111             : 
     112             : /*
     113             :  * Write BEGIN PREPARE to the output stream.
     114             :  */
     115             : void
     116          34 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
     117             : {
     118          34 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
     119             : 
     120             :     /* fixed fields */
     121          34 :     pq_sendint64(out, txn->final_lsn);
     122          34 :     pq_sendint64(out, txn->end_lsn);
     123          34 :     pq_sendint64(out, txn->prepare_time);
     124          34 :     pq_sendint32(out, txn->xid);
     125             : 
     126             :     /* send gid */
     127          34 :     pq_sendstring(out, txn->gid);
     128          34 : }
     129             : 
     130             : /*
     131             :  * Read transaction BEGIN PREPARE from the stream.
     132             :  */
     133             : void
     134          32 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
     135             : {
     136             :     /* read fields */
     137          32 :     begin_data->prepare_lsn = pq_getmsgint64(in);
     138          32 :     if (!XLogRecPtrIsValid(begin_data->prepare_lsn))
     139           0 :         elog(ERROR, "prepare_lsn not set in begin prepare message");
     140          32 :     begin_data->end_lsn = pq_getmsgint64(in);
     141          32 :     if (!XLogRecPtrIsValid(begin_data->end_lsn))
     142           0 :         elog(ERROR, "end_lsn not set in begin prepare message");
     143          32 :     begin_data->prepare_time = pq_getmsgint64(in);
     144          32 :     begin_data->xid = pq_getmsgint(in, 4);
     145             : 
     146             :     /* read gid (copy it into a pre-allocated buffer) */
     147          32 :     strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
     148          32 : }
     149             : 
     150             : /*
     151             :  * The core functionality for logicalrep_write_prepare and
     152             :  * logicalrep_write_stream_prepare.
     153             :  */
     154             : static void
     155          62 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
     156             :                                 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
     157             : {
     158          62 :     uint8       flags = 0;
     159             : 
     160          62 :     pq_sendbyte(out, type);
     161             : 
     162             :     /*
     163             :      * This should only ever happen for two-phase commit transactions, in
     164             :      * which case we expect to have a valid GID.
     165             :      */
     166             :     Assert(txn->gid != NULL);
     167             :     Assert(rbtxn_is_prepared(txn));
     168             :     Assert(TransactionIdIsValid(txn->xid));
     169             : 
     170             :     /* send the flags field */
     171          62 :     pq_sendbyte(out, flags);
     172             : 
     173             :     /* send fields */
     174          62 :     pq_sendint64(out, prepare_lsn);
     175          62 :     pq_sendint64(out, txn->end_lsn);
     176          62 :     pq_sendint64(out, txn->prepare_time);
     177          62 :     pq_sendint32(out, txn->xid);
     178             : 
     179             :     /* send gid */
     180          62 :     pq_sendstring(out, txn->gid);
     181          62 : }
     182             : 
     183             : /*
     184             :  * Write PREPARE to the output stream.
     185             :  */
     186             : void
     187          34 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
     188             :                          XLogRecPtr prepare_lsn)
     189             : {
     190          34 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
     191             :                                     txn, prepare_lsn);
     192          34 : }
     193             : 
     194             : /*
     195             :  * The core functionality for logicalrep_read_prepare and
     196             :  * logicalrep_read_stream_prepare.
     197             :  */
     198             : static void
     199          52 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
     200             :                                LogicalRepPreparedTxnData *prepare_data)
     201             : {
     202             :     /* read flags */
     203          52 :     uint8       flags = pq_getmsgbyte(in);
     204             : 
     205          52 :     if (flags != 0)
     206           0 :         elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
     207             : 
     208             :     /* read fields */
     209          52 :     prepare_data->prepare_lsn = pq_getmsgint64(in);
     210          52 :     if (!XLogRecPtrIsValid(prepare_data->prepare_lsn))
     211           0 :         elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
     212          52 :     prepare_data->end_lsn = pq_getmsgint64(in);
     213          52 :     if (!XLogRecPtrIsValid(prepare_data->end_lsn))
     214           0 :         elog(ERROR, "end_lsn is not set in %s message", msgtype);
     215          52 :     prepare_data->prepare_time = pq_getmsgint64(in);
     216          52 :     prepare_data->xid = pq_getmsgint(in, 4);
     217          52 :     if (prepare_data->xid == InvalidTransactionId)
     218           0 :         elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
     219             : 
     220             :     /* read gid (copy it into a pre-allocated buffer) */
     221          52 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     222          52 : }
     223             : 
     224             : /*
     225             :  * Read transaction PREPARE from the stream.
     226             :  */
     227             : void
     228          30 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     229             : {
     230          30 :     logicalrep_read_prepare_common(in, "prepare", prepare_data);
     231          30 : }
     232             : 
     233             : /*
     234             :  * Write COMMIT PREPARED to the output stream.
     235             :  */
     236             : void
     237          48 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
     238             :                                  XLogRecPtr commit_lsn)
     239             : {
     240          48 :     uint8       flags = 0;
     241             : 
     242          48 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
     243             : 
     244             :     /*
     245             :      * This should only ever happen for two-phase commit transactions, in
     246             :      * which case we expect to have a valid GID.
     247             :      */
     248             :     Assert(txn->gid != NULL);
     249             : 
     250             :     /* send the flags field */
     251          48 :     pq_sendbyte(out, flags);
     252             : 
     253             :     /* send fields */
     254          48 :     pq_sendint64(out, commit_lsn);
     255          48 :     pq_sendint64(out, txn->end_lsn);
     256          48 :     pq_sendint64(out, txn->commit_time);
     257          48 :     pq_sendint32(out, txn->xid);
     258             : 
     259             :     /* send gid */
     260          48 :     pq_sendstring(out, txn->gid);
     261          48 : }
     262             : 
     263             : /*
     264             :  * Read transaction COMMIT PREPARED from the stream.
     265             :  */
     266             : void
     267          40 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
     268             : {
     269             :     /* read flags */
     270          40 :     uint8       flags = pq_getmsgbyte(in);
     271             : 
     272          40 :     if (flags != 0)
     273           0 :         elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
     274             : 
     275             :     /* read fields */
     276          40 :     prepare_data->commit_lsn = pq_getmsgint64(in);
     277          40 :     if (!XLogRecPtrIsValid(prepare_data->commit_lsn))
     278           0 :         elog(ERROR, "commit_lsn is not set in commit prepared message");
     279          40 :     prepare_data->end_lsn = pq_getmsgint64(in);
     280          40 :     if (!XLogRecPtrIsValid(prepare_data->end_lsn))
     281           0 :         elog(ERROR, "end_lsn is not set in commit prepared message");
     282          40 :     prepare_data->commit_time = pq_getmsgint64(in);
     283          40 :     prepare_data->xid = pq_getmsgint(in, 4);
     284             : 
     285             :     /* read gid (copy it into a pre-allocated buffer) */
     286          40 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     287          40 : }
     288             : 
     289             : /*
     290             :  * Write ROLLBACK PREPARED to the output stream.
     291             :  */
     292             : void
     293          14 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
     294             :                                    XLogRecPtr prepare_end_lsn,
     295             :                                    TimestampTz prepare_time)
     296             : {
     297          14 :     uint8       flags = 0;
     298             : 
     299          14 :     pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
     300             : 
     301             :     /*
     302             :      * This should only ever happen for two-phase commit transactions, in
     303             :      * which case we expect to have a valid GID.
     304             :      */
     305             :     Assert(txn->gid != NULL);
     306             : 
     307             :     /* send the flags field */
     308          14 :     pq_sendbyte(out, flags);
     309             : 
     310             :     /* send fields */
     311          14 :     pq_sendint64(out, prepare_end_lsn);
     312          14 :     pq_sendint64(out, txn->end_lsn);
     313          14 :     pq_sendint64(out, prepare_time);
     314          14 :     pq_sendint64(out, txn->commit_time);
     315          14 :     pq_sendint32(out, txn->xid);
     316             : 
     317             :     /* send gid */
     318          14 :     pq_sendstring(out, txn->gid);
     319          14 : }
     320             : 
     321             : /*
     322             :  * Read transaction ROLLBACK PREPARED from the stream.
     323             :  */
     324             : void
     325          10 : logicalrep_read_rollback_prepared(StringInfo in,
     326             :                                   LogicalRepRollbackPreparedTxnData *rollback_data)
     327             : {
     328             :     /* read flags */
     329          10 :     uint8       flags = pq_getmsgbyte(in);
     330             : 
     331          10 :     if (flags != 0)
     332           0 :         elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
     333             : 
     334             :     /* read fields */
     335          10 :     rollback_data->prepare_end_lsn = pq_getmsgint64(in);
     336          10 :     if (!XLogRecPtrIsValid(rollback_data->prepare_end_lsn))
     337           0 :         elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
     338          10 :     rollback_data->rollback_end_lsn = pq_getmsgint64(in);
     339          10 :     if (!XLogRecPtrIsValid(rollback_data->rollback_end_lsn))
     340           0 :         elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
     341          10 :     rollback_data->prepare_time = pq_getmsgint64(in);
     342          10 :     rollback_data->rollback_time = pq_getmsgint64(in);
     343          10 :     rollback_data->xid = pq_getmsgint(in, 4);
     344             : 
     345             :     /* read gid (copy it into a pre-allocated buffer) */
     346          10 :     strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
     347          10 : }
     348             : 
     349             : /*
     350             :  * Write STREAM PREPARE to the output stream.
     351             :  */
     352             : void
     353          28 : logicalrep_write_stream_prepare(StringInfo out,
     354             :                                 ReorderBufferTXN *txn,
     355             :                                 XLogRecPtr prepare_lsn)
     356             : {
     357          28 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
     358             :                                     txn, prepare_lsn);
     359          28 : }
     360             : 
     361             : /*
     362             :  * Read STREAM PREPARE from the stream.
     363             :  */
     364             : void
     365          22 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     366             : {
     367          22 :     logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
     368          22 : }
     369             : 
     370             : /*
     371             :  * Write ORIGIN to the output stream.
     372             :  */
     373             : void
     374          20 : logicalrep_write_origin(StringInfo out, const char *origin,
     375             :                         XLogRecPtr origin_lsn)
     376             : {
     377          20 :     pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
     378             : 
     379             :     /* fixed fields */
     380          20 :     pq_sendint64(out, origin_lsn);
     381             : 
     382             :     /* origin string */
     383          20 :     pq_sendstring(out, origin);
     384          20 : }
     385             : 
     386             : /*
     387             :  * Read ORIGIN from the output stream.
     388             :  */
     389             : char *
     390           0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
     391             : {
     392             :     /* fixed fields */
     393           0 :     *origin_lsn = pq_getmsgint64(in);
     394             : 
     395             :     /* return origin */
     396           0 :     return pstrdup(pq_getmsgstring(in));
     397             : }
     398             : 
     399             : /*
     400             :  * Write INSERT to the output stream.
     401             :  */
     402             : void
     403      211838 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
     404             :                         TupleTableSlot *newslot, bool binary,
     405             :                         Bitmapset *columns,
     406             :                         PublishGencolsType include_gencols_type)
     407             : {
     408      211838 :     pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
     409             : 
     410             :     /* transaction ID (if not valid, we're not streaming) */
     411      211838 :     if (TransactionIdIsValid(xid))
     412      200168 :         pq_sendint32(out, xid);
     413             : 
     414             :     /* use Oid as relation identifier */
     415      211838 :     pq_sendint32(out, RelationGetRelid(rel));
     416             : 
     417      211838 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     418      211838 :     logicalrep_write_tuple(out, rel, newslot, binary, columns,
     419             :                            include_gencols_type);
     420      211838 : }
     421             : 
     422             : /*
     423             :  * Read INSERT from stream.
     424             :  *
     425             :  * Fills the new tuple.
     426             :  */
     427             : LogicalRepRelId
     428      152188 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
     429             : {
     430             :     char        action;
     431             :     LogicalRepRelId relid;
     432             : 
     433             :     /* read the relation id */
     434      152188 :     relid = pq_getmsgint(in, 4);
     435             : 
     436      152188 :     action = pq_getmsgbyte(in);
     437      152188 :     if (action != 'N')
     438           0 :         elog(ERROR, "expected new tuple but got %d",
     439             :              action);
     440             : 
     441      152188 :     logicalrep_read_tuple(in, newtup);
     442             : 
     443      152188 :     return relid;
     444             : }
     445             : 
     446             : /*
     447             :  * Write UPDATE to the output stream.
     448             :  */
     449             : void
     450       68892 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
     451             :                         TupleTableSlot *oldslot, TupleTableSlot *newslot,
     452             :                         bool binary, Bitmapset *columns,
     453             :                         PublishGencolsType include_gencols_type)
     454             : {
     455       68892 :     pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
     456             : 
     457             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     458             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     459             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     460             : 
     461             :     /* transaction ID (if not valid, we're not streaming) */
     462       68892 :     if (TransactionIdIsValid(xid))
     463       68464 :         pq_sendint32(out, xid);
     464             : 
     465             :     /* use Oid as relation identifier */
     466       68892 :     pq_sendint32(out, RelationGetRelid(rel));
     467             : 
     468       68892 :     if (oldslot != NULL)
     469             :     {
     470         266 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     471         126 :             pq_sendbyte(out, 'O');  /* old tuple follows */
     472             :         else
     473         140 :             pq_sendbyte(out, 'K');  /* old key follows */
     474         266 :         logicalrep_write_tuple(out, rel, oldslot, binary, columns,
     475             :                                include_gencols_type);
     476             :     }
     477             : 
     478       68892 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     479       68892 :     logicalrep_write_tuple(out, rel, newslot, binary, columns,
     480             :                            include_gencols_type);
     481       68892 : }
     482             : 
     483             : /*
     484             :  * Read UPDATE from stream.
     485             :  */
     486             : LogicalRepRelId
     487       63880 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
     488             :                        LogicalRepTupleData *oldtup,
     489             :                        LogicalRepTupleData *newtup)
     490             : {
     491             :     char        action;
     492             :     LogicalRepRelId relid;
     493             : 
     494             :     /* read the relation id */
     495       63880 :     relid = pq_getmsgint(in, 4);
     496             : 
     497             :     /* read and verify action */
     498       63880 :     action = pq_getmsgbyte(in);
     499       63880 :     if (action != 'K' && action != 'O' && action != 'N')
     500           0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
     501             :              action);
     502             : 
     503             :     /* check for old tuple */
     504       63880 :     if (action == 'K' || action == 'O')
     505             :     {
     506         270 :         logicalrep_read_tuple(in, oldtup);
     507         270 :         *has_oldtuple = true;
     508             : 
     509         270 :         action = pq_getmsgbyte(in);
     510             :     }
     511             :     else
     512       63610 :         *has_oldtuple = false;
     513             : 
     514             :     /* check for new  tuple */
     515       63880 :     if (action != 'N')
     516           0 :         elog(ERROR, "expected action 'N', got %c",
     517             :              action);
     518             : 
     519       63880 :     logicalrep_read_tuple(in, newtup);
     520             : 
     521       63880 :     return relid;
     522             : }
     523             : 
     524             : /*
     525             :  * Write DELETE to the output stream.
     526             :  */
     527             : void
     528       83772 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
     529             :                         TupleTableSlot *oldslot, bool binary,
     530             :                         Bitmapset *columns,
     531             :                         PublishGencolsType include_gencols_type)
     532             : {
     533             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     534             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     535             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     536             : 
     537       83772 :     pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
     538             : 
     539             :     /* transaction ID (if not valid, we're not streaming) */
     540       83772 :     if (TransactionIdIsValid(xid))
     541       83250 :         pq_sendint32(out, xid);
     542             : 
     543             :     /* use Oid as relation identifier */
     544       83772 :     pq_sendint32(out, RelationGetRelid(rel));
     545             : 
     546       83772 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     547         258 :         pq_sendbyte(out, 'O');  /* old tuple follows */
     548             :     else
     549       83514 :         pq_sendbyte(out, 'K');  /* old key follows */
     550             : 
     551       83772 :     logicalrep_write_tuple(out, rel, oldslot, binary, columns,
     552             :                            include_gencols_type);
     553       83772 : }
     554             : 
     555             : /*
     556             :  * Read DELETE from stream.
     557             :  *
     558             :  * Fills the old tuple.
     559             :  */
     560             : LogicalRepRelId
     561       80642 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
     562             : {
     563             :     char        action;
     564             :     LogicalRepRelId relid;
     565             : 
     566             :     /* read the relation id */
     567       80642 :     relid = pq_getmsgint(in, 4);
     568             : 
     569             :     /* read and verify action */
     570       80642 :     action = pq_getmsgbyte(in);
     571       80642 :     if (action != 'K' && action != 'O')
     572           0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
     573             : 
     574       80642 :     logicalrep_read_tuple(in, oldtup);
     575             : 
     576       80642 :     return relid;
     577             : }
     578             : 
     579             : /*
     580             :  * Write TRUNCATE to the output stream.
     581             :  */
     582             : void
     583          22 : logicalrep_write_truncate(StringInfo out,
     584             :                           TransactionId xid,
     585             :                           int nrelids,
     586             :                           Oid relids[],
     587             :                           bool cascade, bool restart_seqs)
     588             : {
     589             :     int         i;
     590          22 :     uint8       flags = 0;
     591             : 
     592          22 :     pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
     593             : 
     594             :     /* transaction ID (if not valid, we're not streaming) */
     595          22 :     if (TransactionIdIsValid(xid))
     596           0 :         pq_sendint32(out, xid);
     597             : 
     598          22 :     pq_sendint32(out, nrelids);
     599             : 
     600             :     /* encode and send truncate flags */
     601          22 :     if (cascade)
     602           0 :         flags |= TRUNCATE_CASCADE;
     603          22 :     if (restart_seqs)
     604           0 :         flags |= TRUNCATE_RESTART_SEQS;
     605          22 :     pq_sendint8(out, flags);
     606             : 
     607          56 :     for (i = 0; i < nrelids; i++)
     608          34 :         pq_sendint32(out, relids[i]);
     609          22 : }
     610             : 
     611             : /*
     612             :  * Read TRUNCATE from stream.
     613             :  */
     614             : List *
     615          40 : logicalrep_read_truncate(StringInfo in,
     616             :                          bool *cascade, bool *restart_seqs)
     617             : {
     618             :     int         i;
     619             :     int         nrelids;
     620          40 :     List       *relids = NIL;
     621             :     uint8       flags;
     622             : 
     623          40 :     nrelids = pq_getmsgint(in, 4);
     624             : 
     625             :     /* read and decode truncate flags */
     626          40 :     flags = pq_getmsgint(in, 1);
     627          40 :     *cascade = (flags & TRUNCATE_CASCADE) > 0;
     628          40 :     *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
     629             : 
     630          98 :     for (i = 0; i < nrelids; i++)
     631          58 :         relids = lappend_oid(relids, pq_getmsgint(in, 4));
     632             : 
     633          40 :     return relids;
     634             : }
     635             : 
     636             : /*
     637             :  * Write MESSAGE to stream
     638             :  */
     639             : void
     640          10 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
     641             :                          bool transactional, const char *prefix, Size sz,
     642             :                          const char *message)
     643             : {
     644          10 :     uint8       flags = 0;
     645             : 
     646          10 :     pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
     647             : 
     648             :     /* encode and send message flags */
     649          10 :     if (transactional)
     650           4 :         flags |= MESSAGE_TRANSACTIONAL;
     651             : 
     652             :     /* transaction ID (if not valid, we're not streaming) */
     653          10 :     if (TransactionIdIsValid(xid))
     654           0 :         pq_sendint32(out, xid);
     655             : 
     656          10 :     pq_sendint8(out, flags);
     657          10 :     pq_sendint64(out, lsn);
     658          10 :     pq_sendstring(out, prefix);
     659          10 :     pq_sendint32(out, sz);
     660          10 :     pq_sendbytes(out, message, sz);
     661          10 : }
     662             : 
     663             : /*
     664             :  * Write relation description to the output stream.
     665             :  */
     666             : void
     667         746 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
     668             :                      Bitmapset *columns,
     669             :                      PublishGencolsType include_gencols_type)
     670             : {
     671             :     char       *relname;
     672             : 
     673         746 :     pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
     674             : 
     675             :     /* transaction ID (if not valid, we're not streaming) */
     676         746 :     if (TransactionIdIsValid(xid))
     677         144 :         pq_sendint32(out, xid);
     678             : 
     679             :     /* use Oid as relation identifier */
     680         746 :     pq_sendint32(out, RelationGetRelid(rel));
     681             : 
     682             :     /* send qualified relation name */
     683         746 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
     684         746 :     relname = RelationGetRelationName(rel);
     685         746 :     pq_sendstring(out, relname);
     686             : 
     687             :     /* send replica identity */
     688         746 :     pq_sendbyte(out, rel->rd_rel->relreplident);
     689             : 
     690             :     /* send the attribute info */
     691         746 :     logicalrep_write_attrs(out, rel, columns, include_gencols_type);
     692         746 : }
     693             : 
     694             : /*
     695             :  * Read the relation info from stream and return as LogicalRepRelation.
     696             :  */
     697             : LogicalRepRelation *
     698         856 : logicalrep_read_rel(StringInfo in)
     699             : {
     700         856 :     LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
     701             : 
     702         856 :     rel->remoteid = pq_getmsgint(in, 4);
     703             : 
     704             :     /* Read relation name from stream */
     705         856 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
     706         856 :     rel->relname = pstrdup(pq_getmsgstring(in));
     707             : 
     708             :     /* Read the replica identity. */
     709         856 :     rel->replident = pq_getmsgbyte(in);
     710             : 
     711             :     /* relkind is not sent */
     712         856 :     rel->relkind = 0;
     713             : 
     714             :     /* Get attribute description */
     715         856 :     logicalrep_read_attrs(in, rel);
     716             : 
     717         856 :     return rel;
     718             : }
     719             : 
     720             : /*
     721             :  * Write type info to the output stream.
     722             :  *
     723             :  * This function will always write base type info.
     724             :  */
     725             : void
     726          36 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
     727             : {
     728          36 :     Oid         basetypoid = getBaseType(typoid);
     729             :     HeapTuple   tup;
     730             :     Form_pg_type typtup;
     731             : 
     732          36 :     pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
     733             : 
     734             :     /* transaction ID (if not valid, we're not streaming) */
     735          36 :     if (TransactionIdIsValid(xid))
     736           0 :         pq_sendint32(out, xid);
     737             : 
     738          36 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
     739          36 :     if (!HeapTupleIsValid(tup))
     740           0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
     741          36 :     typtup = (Form_pg_type) GETSTRUCT(tup);
     742             : 
     743             :     /* use Oid as type identifier */
     744          36 :     pq_sendint32(out, typoid);
     745             : 
     746             :     /* send qualified type name */
     747          36 :     logicalrep_write_namespace(out, typtup->typnamespace);
     748          36 :     pq_sendstring(out, NameStr(typtup->typname));
     749             : 
     750          36 :     ReleaseSysCache(tup);
     751          36 : }
     752             : 
     753             : /*
     754             :  * Read type info from the output stream.
     755             :  */
     756             : void
     757          36 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
     758             : {
     759          36 :     ltyp->remoteid = pq_getmsgint(in, 4);
     760             : 
     761             :     /* Read type name from stream */
     762          36 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
     763          36 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
     764          36 : }
     765             : 
     766             : /*
     767             :  * Write a tuple to the outputstream, in the most efficient format possible.
     768             :  */
     769             : static void
     770      364768 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
     771             :                        bool binary, Bitmapset *columns,
     772             :                        PublishGencolsType include_gencols_type)
     773             : {
     774             :     TupleDesc   desc;
     775             :     Datum      *values;
     776             :     bool       *isnull;
     777             :     int         i;
     778      364768 :     uint16      nliveatts = 0;
     779             : 
     780      364768 :     desc = RelationGetDescr(rel);
     781             : 
     782     1097334 :     for (i = 0; i < desc->natts; i++)
     783             :     {
     784      732566 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     785             : 
     786      732566 :         if (!logicalrep_should_publish_column(att, columns,
     787             :                                               include_gencols_type))
     788         282 :             continue;
     789             : 
     790      732284 :         nliveatts++;
     791             :     }
     792      364768 :     pq_sendint16(out, nliveatts);
     793             : 
     794      364768 :     slot_getallattrs(slot);
     795      364768 :     values = slot->tts_values;
     796      364768 :     isnull = slot->tts_isnull;
     797             : 
     798             :     /* Write the values */
     799     1097334 :     for (i = 0; i < desc->natts; i++)
     800             :     {
     801             :         HeapTuple   typtup;
     802             :         Form_pg_type typclass;
     803      732566 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     804             : 
     805      732566 :         if (!logicalrep_should_publish_column(att, columns,
     806             :                                               include_gencols_type))
     807         282 :             continue;
     808             : 
     809      732284 :         if (isnull[i])
     810             :         {
     811      103852 :             pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
     812      103852 :             continue;
     813             :         }
     814             : 
     815      628432 :         if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(values[i])))
     816             :         {
     817             :             /*
     818             :              * Unchanged toasted datum.  (Note that we don't promise to detect
     819             :              * unchanged data in general; this is just a cheap check to avoid
     820             :              * sending large values unnecessarily.)
     821             :              */
     822           6 :             pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
     823           6 :             continue;
     824             :         }
     825             : 
     826      628426 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
     827      628426 :         if (!HeapTupleIsValid(typtup))
     828           0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
     829      628426 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
     830             : 
     831             :         /*
     832             :          * Send in binary if requested and type has suitable send function.
     833             :          */
     834      628426 :         if (binary && OidIsValid(typclass->typsend))
     835      230086 :         {
     836             :             bytea      *outputbytes;
     837             :             int         len;
     838             : 
     839      230086 :             pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
     840      230086 :             outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
     841      230086 :             len = VARSIZE(outputbytes) - VARHDRSZ;
     842      230086 :             pq_sendint(out, len, 4);    /* length */
     843      230086 :             pq_sendbytes(out, VARDATA(outputbytes), len);   /* data */
     844      230086 :             pfree(outputbytes);
     845             :         }
     846             :         else
     847             :         {
     848             :             char       *outputstr;
     849             : 
     850      398340 :             pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
     851      398340 :             outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
     852      398340 :             pq_sendcountedtext(out, outputstr, strlen(outputstr));
     853      398340 :             pfree(outputstr);
     854             :         }
     855             : 
     856      628426 :         ReleaseSysCache(typtup);
     857             :     }
     858      364768 : }
     859             : 
     860             : /*
     861             :  * Read tuple in logical replication format from stream.
     862             :  */
     863             : static void
     864      296980 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
     865             : {
     866             :     int         i;
     867             :     int         natts;
     868             : 
     869             :     /* Get number of attributes */
     870      296980 :     natts = pq_getmsgint(in, 2);
     871             : 
     872             :     /* Allocate space for per-column values; zero out unused StringInfoDatas */
     873      296980 :     tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
     874      296980 :     tuple->colstatus = (char *) palloc(natts * sizeof(char));
     875      296980 :     tuple->ncols = natts;
     876             : 
     877             :     /* Read the data */
     878      903324 :     for (i = 0; i < natts; i++)
     879             :     {
     880             :         char       *buff;
     881             :         char        kind;
     882             :         int         len;
     883      606344 :         StringInfo  value = &tuple->colvalues[i];
     884             : 
     885      606344 :         kind = pq_getmsgbyte(in);
     886      606344 :         tuple->colstatus[i] = kind;
     887             : 
     888      606344 :         switch (kind)
     889             :         {
     890      100744 :             case LOGICALREP_COLUMN_NULL:
     891             :                 /* nothing more to do */
     892      100744 :                 break;
     893           6 :             case LOGICALREP_COLUMN_UNCHANGED:
     894             :                 /* we don't receive the value of an unchanged column */
     895           6 :                 break;
     896      505594 :             case LOGICALREP_COLUMN_TEXT:
     897             :             case LOGICALREP_COLUMN_BINARY:
     898      505594 :                 len = pq_getmsgint(in, 4);  /* read length */
     899             : 
     900             :                 /* and data */
     901      505594 :                 buff = palloc(len + 1);
     902      505594 :                 pq_copymsgbytes(in, buff, len);
     903             : 
     904             :                 /*
     905             :                  * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
     906             :                  * as input functions require that.  For
     907             :                  * LOGICALREP_COLUMN_BINARY it's not technically required, but
     908             :                  * it's harmless.
     909             :                  */
     910      505594 :                 buff[len] = '\0';
     911             : 
     912      505594 :                 initStringInfoFromString(value, buff, len);
     913      505594 :                 break;
     914           0 :             default:
     915           0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
     916             :         }
     917             :     }
     918      296980 : }
     919             : 
     920             : /*
     921             :  * Write relation attribute metadata to the stream.
     922             :  */
     923             : static void
     924         746 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
     925             :                        PublishGencolsType include_gencols_type)
     926             : {
     927             :     TupleDesc   desc;
     928             :     int         i;
     929         746 :     uint16      nliveatts = 0;
     930         746 :     Bitmapset  *idattrs = NULL;
     931             :     bool        replidentfull;
     932             : 
     933         746 :     desc = RelationGetDescr(rel);
     934             : 
     935             :     /* send number of live attributes */
     936        2316 :     for (i = 0; i < desc->natts; i++)
     937             :     {
     938        1570 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     939             : 
     940        1570 :         if (!logicalrep_should_publish_column(att, columns,
     941             :                                               include_gencols_type))
     942         142 :             continue;
     943             : 
     944        1428 :         nliveatts++;
     945             :     }
     946         746 :     pq_sendint16(out, nliveatts);
     947             : 
     948             :     /* fetch bitmap of REPLICATION IDENTITY attributes */
     949         746 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
     950         746 :     if (!replidentfull)
     951         634 :         idattrs = RelationGetIdentityKeyBitmap(rel);
     952             : 
     953             :     /* send the attributes */
     954        2316 :     for (i = 0; i < desc->natts; i++)
     955             :     {
     956        1570 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     957        1570 :         uint8       flags = 0;
     958             : 
     959        1570 :         if (!logicalrep_should_publish_column(att, columns,
     960             :                                               include_gencols_type))
     961         142 :             continue;
     962             : 
     963             :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
     964        2684 :         if (replidentfull ||
     965        1256 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     966             :                           idattrs))
     967         678 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
     968             : 
     969        1428 :         pq_sendbyte(out, flags);
     970             : 
     971             :         /* attribute name */
     972        1428 :         pq_sendstring(out, NameStr(att->attname));
     973             : 
     974             :         /* attribute type id */
     975        1428 :         pq_sendint32(out, (int) att->atttypid);
     976             : 
     977             :         /* attribute mode */
     978        1428 :         pq_sendint32(out, att->atttypmod);
     979             :     }
     980             : 
     981         746 :     bms_free(idattrs);
     982         746 : }
     983             : 
     984             : /*
     985             :  * Read relation attribute metadata from the stream.
     986             :  */
     987             : static void
     988         856 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
     989             : {
     990             :     int         i;
     991             :     int         natts;
     992             :     char      **attnames;
     993             :     Oid        *atttyps;
     994         856 :     Bitmapset  *attkeys = NULL;
     995             : 
     996         856 :     natts = pq_getmsgint(in, 2);
     997         856 :     attnames = palloc(natts * sizeof(char *));
     998         856 :     atttyps = palloc(natts * sizeof(Oid));
     999             : 
    1000             :     /* read the attributes */
    1001        2464 :     for (i = 0; i < natts; i++)
    1002             :     {
    1003             :         uint8       flags;
    1004             : 
    1005             :         /* Check for replica identity column */
    1006        1608 :         flags = pq_getmsgbyte(in);
    1007        1608 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
    1008         764 :             attkeys = bms_add_member(attkeys, i);
    1009             : 
    1010             :         /* attribute name */
    1011        1608 :         attnames[i] = pstrdup(pq_getmsgstring(in));
    1012             : 
    1013             :         /* attribute type id */
    1014        1608 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
    1015             : 
    1016             :         /* we ignore attribute mode for now */
    1017        1608 :         (void) pq_getmsgint(in, 4);
    1018             :     }
    1019             : 
    1020         856 :     rel->attnames = attnames;
    1021         856 :     rel->atttyps = atttyps;
    1022         856 :     rel->attkeys = attkeys;
    1023         856 :     rel->natts = natts;
    1024         856 : }
    1025             : 
    1026             : /*
    1027             :  * Write the namespace name or empty string for pg_catalog (to save space).
    1028             :  */
    1029             : static void
    1030         782 : logicalrep_write_namespace(StringInfo out, Oid nspid)
    1031             : {
    1032         782 :     if (nspid == PG_CATALOG_NAMESPACE)
    1033           2 :         pq_sendbyte(out, '\0');
    1034             :     else
    1035             :     {
    1036         780 :         char       *nspname = get_namespace_name(nspid);
    1037             : 
    1038         780 :         if (nspname == NULL)
    1039           0 :             elog(ERROR, "cache lookup failed for namespace %u",
    1040             :                  nspid);
    1041             : 
    1042         780 :         pq_sendstring(out, nspname);
    1043             :     }
    1044         782 : }
    1045             : 
    1046             : /*
    1047             :  * Read the namespace name while treating empty string as pg_catalog.
    1048             :  */
    1049             : static const char *
    1050         892 : logicalrep_read_namespace(StringInfo in)
    1051             : {
    1052         892 :     const char *nspname = pq_getmsgstring(in);
    1053             : 
    1054         892 :     if (nspname[0] == '\0')
    1055           2 :         nspname = "pg_catalog";
    1056             : 
    1057         892 :     return nspname;
    1058             : }
    1059             : 
    1060             : /*
    1061             :  * Write the information for the start stream message to the output stream.
    1062             :  */
    1063             : void
    1064        1276 : logicalrep_write_stream_start(StringInfo out,
    1065             :                               TransactionId xid, bool first_segment)
    1066             : {
    1067        1276 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
    1068             : 
    1069             :     Assert(TransactionIdIsValid(xid));
    1070             : 
    1071             :     /* transaction ID (we're starting to stream, so must be valid) */
    1072        1276 :     pq_sendint32(out, xid);
    1073             : 
    1074             :     /* 1 if this is the first streaming segment for this xid */
    1075        1276 :     pq_sendbyte(out, first_segment ? 1 : 0);
    1076        1276 : }
    1077             : 
    1078             : /*
    1079             :  * Read the information about the start stream message from output stream.
    1080             :  */
    1081             : TransactionId
    1082        1678 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
    1083             : {
    1084             :     TransactionId xid;
    1085             : 
    1086             :     Assert(first_segment);
    1087             : 
    1088        1678 :     xid = pq_getmsgint(in, 4);
    1089        1678 :     *first_segment = (pq_getmsgbyte(in) == 1);
    1090             : 
    1091        1678 :     return xid;
    1092             : }
    1093             : 
    1094             : /*
    1095             :  * Write the stop stream message to the output stream.
    1096             :  */
    1097             : void
    1098        1276 : logicalrep_write_stream_stop(StringInfo out)
    1099             : {
    1100        1276 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
    1101        1276 : }
    1102             : 
    1103             : /*
    1104             :  * Write STREAM COMMIT to the output stream.
    1105             :  */
    1106             : void
    1107          92 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
    1108             :                                XLogRecPtr commit_lsn)
    1109             : {
    1110          92 :     uint8       flags = 0;
    1111             : 
    1112          92 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
    1113             : 
    1114             :     Assert(TransactionIdIsValid(txn->xid));
    1115             : 
    1116             :     /* transaction ID */
    1117          92 :     pq_sendint32(out, txn->xid);
    1118             : 
    1119             :     /* send the flags field (unused for now) */
    1120          92 :     pq_sendbyte(out, flags);
    1121             : 
    1122             :     /* send fields */
    1123          92 :     pq_sendint64(out, commit_lsn);
    1124          92 :     pq_sendint64(out, txn->end_lsn);
    1125          92 :     pq_sendint64(out, txn->commit_time);
    1126          92 : }
    1127             : 
    1128             : /*
    1129             :  * Read STREAM COMMIT from the output stream.
    1130             :  */
    1131             : TransactionId
    1132         122 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
    1133             : {
    1134             :     TransactionId xid;
    1135             :     uint8       flags;
    1136             : 
    1137         122 :     xid = pq_getmsgint(in, 4);
    1138             : 
    1139             :     /* read flags (unused for now) */
    1140         122 :     flags = pq_getmsgbyte(in);
    1141             : 
    1142         122 :     if (flags != 0)
    1143           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
    1144             : 
    1145             :     /* read fields */
    1146         122 :     commit_data->commit_lsn = pq_getmsgint64(in);
    1147         122 :     commit_data->end_lsn = pq_getmsgint64(in);
    1148         122 :     commit_data->committime = pq_getmsgint64(in);
    1149             : 
    1150         122 :     return xid;
    1151             : }
    1152             : 
    1153             : /*
    1154             :  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
    1155             :  * same for the top-level transaction abort.
    1156             :  *
    1157             :  * If write_abort_info is true, send the abort_lsn and abort_time fields,
    1158             :  * otherwise don't.
    1159             :  */
    1160             : void
    1161          52 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
    1162             :                               TransactionId subxid, XLogRecPtr abort_lsn,
    1163             :                               TimestampTz abort_time, bool write_abort_info)
    1164             : {
    1165          52 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
    1166             : 
    1167             :     Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
    1168             : 
    1169             :     /* transaction ID */
    1170          52 :     pq_sendint32(out, xid);
    1171          52 :     pq_sendint32(out, subxid);
    1172             : 
    1173          52 :     if (write_abort_info)
    1174             :     {
    1175          24 :         pq_sendint64(out, abort_lsn);
    1176          24 :         pq_sendint64(out, abort_time);
    1177             :     }
    1178          52 : }
    1179             : 
    1180             : /*
    1181             :  * Read STREAM ABORT from the output stream.
    1182             :  *
    1183             :  * If read_abort_info is true, read the abort_lsn and abort_time fields,
    1184             :  * otherwise don't.
    1185             :  */
    1186             : void
    1187          76 : logicalrep_read_stream_abort(StringInfo in,
    1188             :                              LogicalRepStreamAbortData *abort_data,
    1189             :                              bool read_abort_info)
    1190             : {
    1191             :     Assert(abort_data);
    1192             : 
    1193          76 :     abort_data->xid = pq_getmsgint(in, 4);
    1194          76 :     abort_data->subxid = pq_getmsgint(in, 4);
    1195             : 
    1196          76 :     if (read_abort_info)
    1197             :     {
    1198          48 :         abort_data->abort_lsn = pq_getmsgint64(in);
    1199          48 :         abort_data->abort_time = pq_getmsgint64(in);
    1200             :     }
    1201             :     else
    1202             :     {
    1203          28 :         abort_data->abort_lsn = InvalidXLogRecPtr;
    1204          28 :         abort_data->abort_time = 0;
    1205             :     }
    1206          76 : }
    1207             : 
    1208             : /*
    1209             :  * Get string representing LogicalRepMsgType.
    1210             :  */
    1211             : const char *
    1212        2026 : logicalrep_message_type(LogicalRepMsgType action)
    1213             : {
    1214             :     static char err_unknown[20];
    1215             : 
    1216        2026 :     switch (action)
    1217             :     {
    1218           2 :         case LOGICAL_REP_MSG_BEGIN:
    1219           2 :             return "BEGIN";
    1220          12 :         case LOGICAL_REP_MSG_COMMIT:
    1221          12 :             return "COMMIT";
    1222           0 :         case LOGICAL_REP_MSG_ORIGIN:
    1223           0 :             return "ORIGIN";
    1224        1350 :         case LOGICAL_REP_MSG_INSERT:
    1225        1350 :             return "INSERT";
    1226          44 :         case LOGICAL_REP_MSG_UPDATE:
    1227          44 :             return "UPDATE";
    1228          28 :         case LOGICAL_REP_MSG_DELETE:
    1229          28 :             return "DELETE";
    1230           0 :         case LOGICAL_REP_MSG_TRUNCATE:
    1231           0 :             return "TRUNCATE";
    1232           4 :         case LOGICAL_REP_MSG_RELATION:
    1233           4 :             return "RELATION";
    1234           0 :         case LOGICAL_REP_MSG_TYPE:
    1235           0 :             return "TYPE";
    1236           0 :         case LOGICAL_REP_MSG_MESSAGE:
    1237           0 :             return "MESSAGE";
    1238           2 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    1239           2 :             return "BEGIN PREPARE";
    1240           4 :         case LOGICAL_REP_MSG_PREPARE:
    1241           4 :             return "PREPARE";
    1242           0 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    1243           0 :             return "COMMIT PREPARED";
    1244           0 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    1245           0 :             return "ROLLBACK PREPARED";
    1246          26 :         case LOGICAL_REP_MSG_STREAM_START:
    1247          26 :             return "STREAM START";
    1248         462 :         case LOGICAL_REP_MSG_STREAM_STOP:
    1249         462 :             return "STREAM STOP";
    1250          50 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    1251          50 :             return "STREAM COMMIT";
    1252          38 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    1253          38 :             return "STREAM ABORT";
    1254           4 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    1255           4 :             return "STREAM PREPARE";
    1256             :     }
    1257             : 
    1258             :     /*
    1259             :      * This message provides context in the error raised when applying a
    1260             :      * logical message. So we can't throw an error here. Return an unknown
    1261             :      * indicator value so that the original error is still reported.
    1262             :      */
    1263           0 :     snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
    1264             : 
    1265           0 :     return err_unknown;
    1266             : }
    1267             : 
    1268             : /*
    1269             :  * Check if the column 'att' of a table should be published.
    1270             :  *
    1271             :  * 'columns' represents the publication column list (if any) for that table.
    1272             :  *
    1273             :  * 'include_gencols_type' value indicates whether generated columns should be
    1274             :  * published when there is no column list. Typically, this will have the same
    1275             :  * value as the 'publish_generated_columns' publication parameter.
    1276             :  *
    1277             :  * Note that generated columns can be published only when present in a
    1278             :  * publication column list, or when include_gencols_type is
    1279             :  * PUBLISH_GENCOLS_STORED.
    1280             :  */
    1281             : bool
    1282     1469842 : logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
    1283             :                                  PublishGencolsType include_gencols_type)
    1284             : {
    1285     1469842 :     if (att->attisdropped)
    1286         102 :         return false;
    1287             : 
    1288             :     /* If a column list is provided, publish only the cols in that list. */
    1289     1469740 :     if (columns)
    1290        1898 :         return bms_is_member(att->attnum, columns);
    1291             : 
    1292             :     /* All non-generated columns are always published. */
    1293     1467842 :     if (!att->attgenerated)
    1294     1467728 :         return true;
    1295             : 
    1296             :     /*
    1297             :      * Stored generated columns are only published when the user sets
    1298             :      * publish_generated_columns as stored.
    1299             :      */
    1300         114 :     if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
    1301          66 :         return include_gencols_type == PUBLISH_GENCOLS_STORED;
    1302             : 
    1303          48 :     return false;
    1304             : }

Generated by: LCOV version 1.16