LCOV - code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 90.0 % 498 448
Test Date: 2026-03-05 14:14:39 Functions: 97.8 % 46 45
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * proto.c
       4              :  *      logical replication protocol functions
       5              :  *
       6              :  * Copyright (c) 2015-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *      src/backend/replication/logical/proto.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : #include "postgres.h"
      14              : 
      15              : #include "access/sysattr.h"
      16              : #include "catalog/pg_namespace.h"
      17              : #include "catalog/pg_type.h"
      18              : #include "libpq/pqformat.h"
      19              : #include "replication/logicalproto.h"
      20              : #include "utils/lsyscache.h"
      21              : #include "utils/syscache.h"
      22              : 
      23              : /*
      24              :  * Protocol message flags.
      25              :  */
      26              : #define LOGICALREP_IS_REPLICA_IDENTITY 1
      27              : 
      28              : #define MESSAGE_TRANSACTIONAL (1<<0)
      29              : #define TRUNCATE_CASCADE        (1<<0)
      30              : #define TRUNCATE_RESTART_SEQS   (1<<1)
      31              : 
      32              : static void logicalrep_write_attrs(StringInfo out, Relation rel,
      33              :                                    Bitmapset *columns,
      34              :                                    PublishGencolsType include_gencols_type);
      35              : static void logicalrep_write_tuple(StringInfo out, Relation rel,
      36              :                                    TupleTableSlot *slot,
      37              :                                    bool binary, Bitmapset *columns,
      38              :                                    PublishGencolsType include_gencols_type);
      39              : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
      40              : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
      41              : 
      42              : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
      43              : static const char *logicalrep_read_namespace(StringInfo in);
      44              : 
      45              : /*
      46              :  * Write BEGIN to the output stream.
      47              :  */
      48              : void
      49          470 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
      50              : {
      51          470 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
      52              : 
      53              :     /* fixed fields */
      54          470 :     pq_sendint64(out, txn->final_lsn);
      55          470 :     pq_sendint64(out, txn->commit_time);
      56          470 :     pq_sendint32(out, txn->xid);
      57          470 : }
      58              : 
      59              : /*
      60              :  * Read transaction BEGIN from the stream.
      61              :  */
      62              : void
      63          503 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
      64              : {
      65              :     /* read fields */
      66          503 :     begin_data->final_lsn = pq_getmsgint64(in);
      67          503 :     if (!XLogRecPtrIsValid(begin_data->final_lsn))
      68            0 :         elog(ERROR, "final_lsn not set in begin message");
      69          503 :     begin_data->committime = pq_getmsgint64(in);
      70          503 :     begin_data->xid = pq_getmsgint(in, 4);
      71          503 : }
      72              : 
      73              : 
      74              : /*
      75              :  * Write COMMIT to the output stream.
      76              :  */
      77              : void
      78          468 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
      79              :                         XLogRecPtr commit_lsn)
      80              : {
      81          468 :     uint8       flags = 0;
      82              : 
      83          468 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
      84              : 
      85              :     /* send the flags field (unused for now) */
      86          468 :     pq_sendbyte(out, flags);
      87              : 
      88              :     /* send fields */
      89          468 :     pq_sendint64(out, commit_lsn);
      90          468 :     pq_sendint64(out, txn->end_lsn);
      91          468 :     pq_sendint64(out, txn->commit_time);
      92          468 : }
      93              : 
      94              : /*
      95              :  * Read transaction COMMIT from the stream.
      96              :  */
      97              : void
      98          442 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
      99              : {
     100              :     /* read flags (unused for now) */
     101          442 :     uint8       flags = pq_getmsgbyte(in);
     102              : 
     103          442 :     if (flags != 0)
     104            0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
     105              : 
     106              :     /* read fields */
     107          442 :     commit_data->commit_lsn = pq_getmsgint64(in);
     108          442 :     commit_data->end_lsn = pq_getmsgint64(in);
     109          442 :     commit_data->committime = pq_getmsgint64(in);
     110          442 : }
     111              : 
     112              : /*
     113              :  * Write BEGIN PREPARE to the output stream.
     114              :  */
     115              : void
     116           17 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
     117              : {
     118           17 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
     119              : 
     120              :     /* fixed fields */
     121           17 :     pq_sendint64(out, txn->final_lsn);
     122           17 :     pq_sendint64(out, txn->end_lsn);
     123           17 :     pq_sendint64(out, txn->prepare_time);
     124           17 :     pq_sendint32(out, txn->xid);
     125              : 
     126              :     /* send gid */
     127           17 :     pq_sendstring(out, txn->gid);
     128           17 : }
     129              : 
     130              : /*
     131              :  * Read transaction BEGIN PREPARE from the stream.
     132              :  */
     133              : void
     134           17 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
     135              : {
     136              :     /* read fields */
     137           17 :     begin_data->prepare_lsn = pq_getmsgint64(in);
     138           17 :     if (!XLogRecPtrIsValid(begin_data->prepare_lsn))
     139            0 :         elog(ERROR, "prepare_lsn not set in begin prepare message");
     140           17 :     begin_data->end_lsn = pq_getmsgint64(in);
     141           17 :     if (!XLogRecPtrIsValid(begin_data->end_lsn))
     142            0 :         elog(ERROR, "end_lsn not set in begin prepare message");
     143           17 :     begin_data->prepare_time = pq_getmsgint64(in);
     144           17 :     begin_data->xid = pq_getmsgint(in, 4);
     145              : 
     146              :     /* read gid (copy it into a pre-allocated buffer) */
     147           17 :     strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
     148           17 : }
     149              : 
     150              : /*
     151              :  * The core functionality for logicalrep_write_prepare and
     152              :  * logicalrep_write_stream_prepare.
     153              :  */
     154              : static void
     155           33 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
     156              :                                 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
     157              : {
     158           33 :     uint8       flags = 0;
     159              : 
     160           33 :     pq_sendbyte(out, type);
     161              : 
     162              :     /*
     163              :      * This should only ever happen for two-phase commit transactions, in
     164              :      * which case we expect to have a valid GID.
     165              :      */
     166              :     Assert(txn->gid != NULL);
     167              :     Assert(rbtxn_is_prepared(txn));
     168              :     Assert(TransactionIdIsValid(txn->xid));
     169              : 
     170              :     /* send the flags field */
     171           33 :     pq_sendbyte(out, flags);
     172              : 
     173              :     /* send fields */
     174           33 :     pq_sendint64(out, prepare_lsn);
     175           33 :     pq_sendint64(out, txn->end_lsn);
     176           33 :     pq_sendint64(out, txn->prepare_time);
     177           33 :     pq_sendint32(out, txn->xid);
     178              : 
     179              :     /* send gid */
     180           33 :     pq_sendstring(out, txn->gid);
     181           33 : }
     182              : 
     183              : /*
     184              :  * Write PREPARE to the output stream.
     185              :  */
     186              : void
     187           17 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
     188              :                          XLogRecPtr prepare_lsn)
     189              : {
     190           17 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
     191              :                                     txn, prepare_lsn);
     192           17 : }
     193              : 
     194              : /*
     195              :  * The core functionality for logicalrep_read_prepare and
     196              :  * logicalrep_read_stream_prepare.
     197              :  */
     198              : static void
     199           31 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
     200              :                                LogicalRepPreparedTxnData *prepare_data)
     201              : {
     202              :     /* read flags */
     203           31 :     uint8       flags = pq_getmsgbyte(in);
     204              : 
     205           31 :     if (flags != 0)
     206            0 :         elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
     207              : 
     208              :     /* read fields */
     209           31 :     prepare_data->prepare_lsn = pq_getmsgint64(in);
     210           31 :     if (!XLogRecPtrIsValid(prepare_data->prepare_lsn))
     211            0 :         elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
     212           31 :     prepare_data->end_lsn = pq_getmsgint64(in);
     213           31 :     if (!XLogRecPtrIsValid(prepare_data->end_lsn))
     214            0 :         elog(ERROR, "end_lsn is not set in %s message", msgtype);
     215           31 :     prepare_data->prepare_time = pq_getmsgint64(in);
     216           31 :     prepare_data->xid = pq_getmsgint(in, 4);
     217           31 :     if (prepare_data->xid == InvalidTransactionId)
     218            0 :         elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
     219              : 
     220              :     /* read gid (copy it into a pre-allocated buffer) */
     221           31 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     222           31 : }
     223              : 
     224              : /*
     225              :  * Read transaction PREPARE from the stream.
     226              :  */
     227              : void
     228           16 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     229              : {
     230           16 :     logicalrep_read_prepare_common(in, "prepare", prepare_data);
     231           16 : }
     232              : 
     233              : /*
     234              :  * Write COMMIT PREPARED to the output stream.
     235              :  */
     236              : void
     237           25 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
     238              :                                  XLogRecPtr commit_lsn)
     239              : {
     240           25 :     uint8       flags = 0;
     241              : 
     242           25 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
     243              : 
     244              :     /*
     245              :      * This should only ever happen for two-phase commit transactions, in
     246              :      * which case we expect to have a valid GID.
     247              :      */
     248              :     Assert(txn->gid != NULL);
     249              : 
     250              :     /* send the flags field */
     251           25 :     pq_sendbyte(out, flags);
     252              : 
     253              :     /* send fields */
     254           25 :     pq_sendint64(out, commit_lsn);
     255           25 :     pq_sendint64(out, txn->end_lsn);
     256           25 :     pq_sendint64(out, txn->commit_time);
     257           25 :     pq_sendint32(out, txn->xid);
     258              : 
     259              :     /* send gid */
     260           25 :     pq_sendstring(out, txn->gid);
     261           25 : }
     262              : 
     263              : /*
     264              :  * Read transaction COMMIT PREPARED from the stream.
     265              :  */
     266              : void
     267           22 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
     268              : {
     269              :     /* read flags */
     270           22 :     uint8       flags = pq_getmsgbyte(in);
     271              : 
     272           22 :     if (flags != 0)
     273            0 :         elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
     274              : 
     275              :     /* read fields */
     276           22 :     prepare_data->commit_lsn = pq_getmsgint64(in);
     277           22 :     if (!XLogRecPtrIsValid(prepare_data->commit_lsn))
     278            0 :         elog(ERROR, "commit_lsn is not set in commit prepared message");
     279           22 :     prepare_data->end_lsn = pq_getmsgint64(in);
     280           22 :     if (!XLogRecPtrIsValid(prepare_data->end_lsn))
     281            0 :         elog(ERROR, "end_lsn is not set in commit prepared message");
     282           22 :     prepare_data->commit_time = pq_getmsgint64(in);
     283           22 :     prepare_data->xid = pq_getmsgint(in, 4);
     284              : 
     285              :     /* read gid (copy it into a pre-allocated buffer) */
     286           22 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     287           22 : }
     288              : 
     289              : /*
     290              :  * Write ROLLBACK PREPARED to the output stream.
     291              :  */
     292              : void
     293            7 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
     294              :                                    XLogRecPtr prepare_end_lsn,
     295              :                                    TimestampTz prepare_time)
     296              : {
     297            7 :     uint8       flags = 0;
     298              : 
     299            7 :     pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
     300              : 
     301              :     /*
     302              :      * This should only ever happen for two-phase commit transactions, in
     303              :      * which case we expect to have a valid GID.
     304              :      */
     305              :     Assert(txn->gid != NULL);
     306              : 
     307              :     /* send the flags field */
     308            7 :     pq_sendbyte(out, flags);
     309              : 
     310              :     /* send fields */
     311            7 :     pq_sendint64(out, prepare_end_lsn);
     312            7 :     pq_sendint64(out, txn->end_lsn);
     313            7 :     pq_sendint64(out, prepare_time);
     314            7 :     pq_sendint64(out, txn->commit_time);
     315            7 :     pq_sendint32(out, txn->xid);
     316              : 
     317              :     /* send gid */
     318            7 :     pq_sendstring(out, txn->gid);
     319            7 : }
     320              : 
     321              : /*
     322              :  * Read transaction ROLLBACK PREPARED from the stream.
     323              :  */
     324              : void
     325            5 : logicalrep_read_rollback_prepared(StringInfo in,
     326              :                                   LogicalRepRollbackPreparedTxnData *rollback_data)
     327              : {
     328              :     /* read flags */
     329            5 :     uint8       flags = pq_getmsgbyte(in);
     330              : 
     331            5 :     if (flags != 0)
     332            0 :         elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
     333              : 
     334              :     /* read fields */
     335            5 :     rollback_data->prepare_end_lsn = pq_getmsgint64(in);
     336            5 :     if (!XLogRecPtrIsValid(rollback_data->prepare_end_lsn))
     337            0 :         elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
     338            5 :     rollback_data->rollback_end_lsn = pq_getmsgint64(in);
     339            5 :     if (!XLogRecPtrIsValid(rollback_data->rollback_end_lsn))
     340            0 :         elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
     341            5 :     rollback_data->prepare_time = pq_getmsgint64(in);
     342            5 :     rollback_data->rollback_time = pq_getmsgint64(in);
     343            5 :     rollback_data->xid = pq_getmsgint(in, 4);
     344              : 
     345              :     /* read gid (copy it into a pre-allocated buffer) */
     346            5 :     strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
     347            5 : }
     348              : 
     349              : /*
     350              :  * Write STREAM PREPARE to the output stream.
     351              :  */
     352              : void
     353           16 : logicalrep_write_stream_prepare(StringInfo out,
     354              :                                 ReorderBufferTXN *txn,
     355              :                                 XLogRecPtr prepare_lsn)
     356              : {
     357           16 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
     358              :                                     txn, prepare_lsn);
     359           16 : }
     360              : 
     361              : /*
     362              :  * Read STREAM PREPARE from the stream.
     363              :  */
     364              : void
     365           15 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     366              : {
     367           15 :     logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
     368           15 : }
     369              : 
     370              : /*
     371              :  * Write ORIGIN to the output stream.
     372              :  */
     373              : void
     374            8 : logicalrep_write_origin(StringInfo out, const char *origin,
     375              :                         XLogRecPtr origin_lsn)
     376              : {
     377            8 :     pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
     378              : 
     379              :     /* fixed fields */
     380            8 :     pq_sendint64(out, origin_lsn);
     381              : 
     382              :     /* origin string */
     383            8 :     pq_sendstring(out, origin);
     384            8 : }
     385              : 
     386              : /*
     387              :  * Read ORIGIN from the output stream.
     388              :  */
     389              : char *
     390            0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
     391              : {
     392              :     /* fixed fields */
     393            0 :     *origin_lsn = pq_getmsgint64(in);
     394              : 
     395              :     /* return origin */
     396            0 :     return pstrdup(pq_getmsgstring(in));
     397              : }
     398              : 
     399              : /*
     400              :  * Write INSERT to the output stream.
     401              :  */
     402              : void
     403       105930 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
     404              :                         TupleTableSlot *newslot, bool binary,
     405              :                         Bitmapset *columns,
     406              :                         PublishGencolsType include_gencols_type)
     407              : {
     408       105930 :     pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
     409              : 
     410              :     /* transaction ID (if not valid, we're not streaming) */
     411       105930 :     if (TransactionIdIsValid(xid))
     412       100056 :         pq_sendint32(out, xid);
     413              : 
     414              :     /* use Oid as relation identifier */
     415       105930 :     pq_sendint32(out, RelationGetRelid(rel));
     416              : 
     417       105930 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     418       105930 :     logicalrep_write_tuple(out, rel, newslot, binary, columns,
     419              :                            include_gencols_type);
     420       105930 : }
     421              : 
     422              : /*
     423              :  * Read INSERT from stream.
     424              :  *
     425              :  * Fills the new tuple.
     426              :  */
     427              : LogicalRepRelId
     428        75932 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
     429              : {
     430              :     char        action;
     431              :     LogicalRepRelId relid;
     432              : 
     433              :     /* read the relation id */
     434        75932 :     relid = pq_getmsgint(in, 4);
     435              : 
     436        75932 :     action = pq_getmsgbyte(in);
     437        75932 :     if (action != 'N')
     438            0 :         elog(ERROR, "expected new tuple but got %d",
     439              :              action);
     440              : 
     441        75932 :     logicalrep_read_tuple(in, newtup);
     442              : 
     443        75932 :     return relid;
     444              : }
     445              : 
     446              : /*
     447              :  * Write UPDATE to the output stream.
     448              :  */
     449              : void
     450        34447 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
     451              :                         TupleTableSlot *oldslot, TupleTableSlot *newslot,
     452              :                         bool binary, Bitmapset *columns,
     453              :                         PublishGencolsType include_gencols_type)
     454              : {
     455        34447 :     pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
     456              : 
     457              :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     458              :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     459              :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     460              : 
     461              :     /* transaction ID (if not valid, we're not streaming) */
     462        34447 :     if (TransactionIdIsValid(xid))
     463        34232 :         pq_sendint32(out, xid);
     464              : 
     465              :     /* use Oid as relation identifier */
     466        34447 :     pq_sendint32(out, RelationGetRelid(rel));
     467              : 
     468        34447 :     if (oldslot != NULL)
     469              :     {
     470          134 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     471           64 :             pq_sendbyte(out, 'O');  /* old tuple follows */
     472              :         else
     473           70 :             pq_sendbyte(out, 'K');  /* old key follows */
     474          134 :         logicalrep_write_tuple(out, rel, oldslot, binary, columns,
     475              :                                include_gencols_type);
     476              :     }
     477              : 
     478        34447 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     479        34447 :     logicalrep_write_tuple(out, rel, newslot, binary, columns,
     480              :                            include_gencols_type);
     481        34447 : }
     482              : 
     483              : /*
     484              :  * Read UPDATE from stream.
     485              :  */
     486              : LogicalRepRelId
     487        31941 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
     488              :                        LogicalRepTupleData *oldtup,
     489              :                        LogicalRepTupleData *newtup)
     490              : {
     491              :     char        action;
     492              :     LogicalRepRelId relid;
     493              : 
     494              :     /* read the relation id */
     495        31941 :     relid = pq_getmsgint(in, 4);
     496              : 
     497              :     /* read and verify action */
     498        31941 :     action = pq_getmsgbyte(in);
     499        31941 :     if (action != 'K' && action != 'O' && action != 'N')
     500            0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
     501              :              action);
     502              : 
     503              :     /* check for old tuple */
     504        31941 :     if (action == 'K' || action == 'O')
     505              :     {
     506          136 :         logicalrep_read_tuple(in, oldtup);
     507          136 :         *has_oldtuple = true;
     508              : 
     509          136 :         action = pq_getmsgbyte(in);
     510              :     }
     511              :     else
     512        31805 :         *has_oldtuple = false;
     513              : 
     514              :     /* check for new  tuple */
     515        31941 :     if (action != 'N')
     516            0 :         elog(ERROR, "expected action 'N', got %c",
     517              :              action);
     518              : 
     519        31941 :     logicalrep_read_tuple(in, newtup);
     520              : 
     521        31941 :     return relid;
     522              : }
     523              : 
     524              : /*
     525              :  * Write DELETE to the output stream.
     526              :  */
     527              : void
     528        41886 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
     529              :                         TupleTableSlot *oldslot, bool binary,
     530              :                         Bitmapset *columns,
     531              :                         PublishGencolsType include_gencols_type)
     532              : {
     533              :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     534              :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     535              :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     536              : 
     537        41886 :     pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
     538              : 
     539              :     /* transaction ID (if not valid, we're not streaming) */
     540        41886 :     if (TransactionIdIsValid(xid))
     541        41625 :         pq_sendint32(out, xid);
     542              : 
     543              :     /* use Oid as relation identifier */
     544        41886 :     pq_sendint32(out, RelationGetRelid(rel));
     545              : 
     546        41886 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     547          129 :         pq_sendbyte(out, 'O');  /* old tuple follows */
     548              :     else
     549        41757 :         pq_sendbyte(out, 'K');  /* old key follows */
     550              : 
     551        41886 :     logicalrep_write_tuple(out, rel, oldslot, binary, columns,
     552              :                            include_gencols_type);
     553        41886 : }
     554              : 
     555              : /*
     556              :  * Read DELETE from stream.
     557              :  *
     558              :  * Fills the old tuple.
     559              :  */
     560              : LogicalRepRelId
     561        40321 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
     562              : {
     563              :     char        action;
     564              :     LogicalRepRelId relid;
     565              : 
     566              :     /* read the relation id */
     567        40321 :     relid = pq_getmsgint(in, 4);
     568              : 
     569              :     /* read and verify action */
     570        40321 :     action = pq_getmsgbyte(in);
     571        40321 :     if (action != 'K' && action != 'O')
     572            0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
     573              : 
     574        40321 :     logicalrep_read_tuple(in, oldtup);
     575              : 
     576        40321 :     return relid;
     577              : }
     578              : 
     579              : /*
     580              :  * Write TRUNCATE to the output stream.
     581              :  */
     582              : void
     583           12 : logicalrep_write_truncate(StringInfo out,
     584              :                           TransactionId xid,
     585              :                           int nrelids,
     586              :                           Oid relids[],
     587              :                           bool cascade, bool restart_seqs)
     588              : {
     589              :     int         i;
     590           12 :     uint8       flags = 0;
     591              : 
     592           12 :     pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
     593              : 
     594              :     /* transaction ID (if not valid, we're not streaming) */
     595           12 :     if (TransactionIdIsValid(xid))
     596            0 :         pq_sendint32(out, xid);
     597              : 
     598           12 :     pq_sendint32(out, nrelids);
     599              : 
     600              :     /* encode and send truncate flags */
     601           12 :     if (cascade)
     602            0 :         flags |= TRUNCATE_CASCADE;
     603           12 :     if (restart_seqs)
     604            0 :         flags |= TRUNCATE_RESTART_SEQS;
     605           12 :     pq_sendint8(out, flags);
     606              : 
     607           30 :     for (i = 0; i < nrelids; i++)
     608           18 :         pq_sendint32(out, relids[i]);
     609           12 : }
     610              : 
     611              : /*
     612              :  * Read TRUNCATE from stream.
     613              :  */
     614              : List *
     615           20 : logicalrep_read_truncate(StringInfo in,
     616              :                          bool *cascade, bool *restart_seqs)
     617              : {
     618              :     int         i;
     619              :     int         nrelids;
     620           20 :     List       *relids = NIL;
     621              :     uint8       flags;
     622              : 
     623           20 :     nrelids = pq_getmsgint(in, 4);
     624              : 
     625              :     /* read and decode truncate flags */
     626           20 :     flags = pq_getmsgint(in, 1);
     627           20 :     *cascade = (flags & TRUNCATE_CASCADE) > 0;
     628           20 :     *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
     629              : 
     630           49 :     for (i = 0; i < nrelids; i++)
     631           29 :         relids = lappend_oid(relids, pq_getmsgint(in, 4));
     632              : 
     633           20 :     return relids;
     634              : }
     635              : 
     636              : /*
     637              :  * Write MESSAGE to stream
     638              :  */
     639              : void
     640            5 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
     641              :                          bool transactional, const char *prefix, Size sz,
     642              :                          const char *message)
     643              : {
     644            5 :     uint8       flags = 0;
     645              : 
     646            5 :     pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
     647              : 
     648              :     /* encode and send message flags */
     649            5 :     if (transactional)
     650            2 :         flags |= MESSAGE_TRANSACTIONAL;
     651              : 
     652              :     /* transaction ID (if not valid, we're not streaming) */
     653            5 :     if (TransactionIdIsValid(xid))
     654            0 :         pq_sendint32(out, xid);
     655              : 
     656            5 :     pq_sendint8(out, flags);
     657            5 :     pq_sendint64(out, lsn);
     658            5 :     pq_sendstring(out, prefix);
     659            5 :     pq_sendint32(out, sz);
     660            5 :     pq_sendbytes(out, message, sz);
     661            5 : }
     662              : 
     663              : /*
     664              :  * Write relation description to the output stream.
     665              :  */
     666              : void
     667          394 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
     668              :                      Bitmapset *columns,
     669              :                      PublishGencolsType include_gencols_type)
     670              : {
     671              :     char       *relname;
     672              : 
     673          394 :     pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
     674              : 
     675              :     /* transaction ID (if not valid, we're not streaming) */
     676          394 :     if (TransactionIdIsValid(xid))
     677           74 :         pq_sendint32(out, xid);
     678              : 
     679              :     /* use Oid as relation identifier */
     680          394 :     pq_sendint32(out, RelationGetRelid(rel));
     681              : 
     682              :     /* send qualified relation name */
     683          394 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
     684          394 :     relname = RelationGetRelationName(rel);
     685          394 :     pq_sendstring(out, relname);
     686              : 
     687              :     /* send replica identity */
     688          394 :     pq_sendbyte(out, rel->rd_rel->relreplident);
     689              : 
     690              :     /* send the attribute info */
     691          394 :     logicalrep_write_attrs(out, rel, columns, include_gencols_type);
     692          394 : }
     693              : 
     694              : /*
     695              :  * Read the relation info from stream and return as LogicalRepRelation.
     696              :  */
     697              : LogicalRepRelation *
     698          454 : logicalrep_read_rel(StringInfo in)
     699              : {
     700          454 :     LogicalRepRelation *rel = palloc_object(LogicalRepRelation);
     701              : 
     702          454 :     rel->remoteid = pq_getmsgint(in, 4);
     703              : 
     704              :     /* Read relation name from stream */
     705          454 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
     706          454 :     rel->relname = pstrdup(pq_getmsgstring(in));
     707              : 
     708              :     /* Read the replica identity. */
     709          454 :     rel->replident = pq_getmsgbyte(in);
     710              : 
     711              :     /* relkind is not sent */
     712          454 :     rel->relkind = 0;
     713              : 
     714              :     /* Get attribute description */
     715          454 :     logicalrep_read_attrs(in, rel);
     716              : 
     717          454 :     return rel;
     718              : }
     719              : 
     720              : /*
     721              :  * Write type info to the output stream.
     722              :  *
     723              :  * This function will always write base type info.
     724              :  */
     725              : void
     726           18 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
     727              : {
     728           18 :     Oid         basetypoid = getBaseType(typoid);
     729              :     HeapTuple   tup;
     730              :     Form_pg_type typtup;
     731              : 
     732           18 :     pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
     733              : 
     734              :     /* transaction ID (if not valid, we're not streaming) */
     735           18 :     if (TransactionIdIsValid(xid))
     736            0 :         pq_sendint32(out, xid);
     737              : 
     738           18 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
     739           18 :     if (!HeapTupleIsValid(tup))
     740            0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
     741           18 :     typtup = (Form_pg_type) GETSTRUCT(tup);
     742              : 
     743              :     /* use Oid as type identifier */
     744           18 :     pq_sendint32(out, typoid);
     745              : 
     746              :     /* send qualified type name */
     747           18 :     logicalrep_write_namespace(out, typtup->typnamespace);
     748           18 :     pq_sendstring(out, NameStr(typtup->typname));
     749              : 
     750           18 :     ReleaseSysCache(tup);
     751           18 : }
     752              : 
     753              : /*
     754              :  * Read type info from the output stream.
     755              :  */
     756              : void
     757           18 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
     758              : {
     759           18 :     ltyp->remoteid = pq_getmsgint(in, 4);
     760              : 
     761              :     /* Read type name from stream */
     762           18 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
     763           18 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
     764           18 : }
     765              : 
     766              : /*
     767              :  * Write a tuple to the outputstream, in the most efficient format possible.
     768              :  */
     769              : static void
     770       182397 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
     771              :                        bool binary, Bitmapset *columns,
     772              :                        PublishGencolsType include_gencols_type)
     773              : {
     774              :     TupleDesc   desc;
     775              :     Datum      *values;
     776              :     bool       *isnull;
     777              :     int         i;
     778       182397 :     uint16      nliveatts = 0;
     779              : 
     780       182397 :     desc = RelationGetDescr(rel);
     781              : 
     782       548708 :     for (i = 0; i < desc->natts; i++)
     783              :     {
     784       366311 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     785              : 
     786       366311 :         if (!logicalrep_should_publish_column(att, columns,
     787              :                                               include_gencols_type))
     788          141 :             continue;
     789              : 
     790       366170 :         nliveatts++;
     791              :     }
     792       182397 :     pq_sendint16(out, nliveatts);
     793              : 
     794       182397 :     slot_getallattrs(slot);
     795       182397 :     values = slot->tts_values;
     796       182397 :     isnull = slot->tts_isnull;
     797              : 
     798              :     /* Write the values */
     799       548708 :     for (i = 0; i < desc->natts; i++)
     800              :     {
     801              :         HeapTuple   typtup;
     802              :         Form_pg_type typclass;
     803       366311 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     804              : 
     805       366311 :         if (!logicalrep_should_publish_column(att, columns,
     806              :                                               include_gencols_type))
     807          141 :             continue;
     808              : 
     809       366170 :         if (isnull[i])
     810              :         {
     811        51925 :             pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
     812        51925 :             continue;
     813              :         }
     814              : 
     815       314245 :         if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(values[i])))
     816              :         {
     817              :             /*
     818              :              * Unchanged toasted datum.  (Note that we don't promise to detect
     819              :              * unchanged data in general; this is just a cheap check to avoid
     820              :              * sending large values unnecessarily.)
     821              :              */
     822            0 :             pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
     823            0 :             continue;
     824              :         }
     825              : 
     826       314245 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
     827       314245 :         if (!HeapTupleIsValid(typtup))
     828            0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
     829       314245 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
     830              : 
     831              :         /*
     832              :          * Send in binary if requested and type has suitable send function.
     833              :          */
     834       314245 :         if (binary && OidIsValid(typclass->typsend))
     835       115015 :         {
     836              :             bytea      *outputbytes;
     837              :             int         len;
     838              : 
     839       115015 :             pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
     840       115015 :             outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
     841       115015 :             len = VARSIZE(outputbytes) - VARHDRSZ;
     842       115015 :             pq_sendint(out, len, 4);    /* length */
     843       115015 :             pq_sendbytes(out, VARDATA(outputbytes), len);   /* data */
     844       115015 :             pfree(outputbytes);
     845              :         }
     846              :         else
     847              :         {
     848              :             char       *outputstr;
     849              : 
     850       199230 :             pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
     851       199230 :             outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
     852       199230 :             pq_sendcountedtext(out, outputstr, strlen(outputstr));
     853       199230 :             pfree(outputstr);
     854              :         }
     855              : 
     856       314245 :         ReleaseSysCache(typtup);
     857              :     }
     858       182397 : }
     859              : 
     860              : /*
     861              :  * Read tuple in logical replication format from stream.
     862              :  */
     863              : static void
     864       148330 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
     865              : {
     866              :     int         i;
     867              :     int         natts;
     868              : 
     869              :     /* Get number of attributes */
     870       148330 :     natts = pq_getmsgint(in, 2);
     871              : 
     872              :     /* Allocate space for per-column values; zero out unused StringInfoDatas */
     873       148330 :     tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
     874       148330 :     tuple->colstatus = palloc_array(char, natts);
     875       148330 :     tuple->ncols = natts;
     876              : 
     877              :     /* Read the data */
     878       451377 :     for (i = 0; i < natts; i++)
     879              :     {
     880              :         char       *buff;
     881              :         char        kind;
     882              :         int         len;
     883       303047 :         StringInfo  value = &tuple->colvalues[i];
     884              : 
     885       303047 :         kind = pq_getmsgbyte(in);
     886       303047 :         tuple->colstatus[i] = kind;
     887              : 
     888       303047 :         switch (kind)
     889              :         {
     890        50371 :             case LOGICALREP_COLUMN_NULL:
     891              :                 /* nothing more to do */
     892        50371 :                 break;
     893            0 :             case LOGICALREP_COLUMN_UNCHANGED:
     894              :                 /* we don't receive the value of an unchanged column */
     895            0 :                 break;
     896       252676 :             case LOGICALREP_COLUMN_TEXT:
     897              :             case LOGICALREP_COLUMN_BINARY:
     898       252676 :                 len = pq_getmsgint(in, 4);  /* read length */
     899              : 
     900              :                 /* and data */
     901       252676 :                 buff = palloc(len + 1);
     902       252676 :                 pq_copymsgbytes(in, buff, len);
     903              : 
     904              :                 /*
     905              :                  * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
     906              :                  * as input functions require that.  For
     907              :                  * LOGICALREP_COLUMN_BINARY it's not technically required, but
     908              :                  * it's harmless.
     909              :                  */
     910       252676 :                 buff[len] = '\0';
     911              : 
     912       252676 :                 initStringInfoFromString(value, buff, len);
     913       252676 :                 break;
     914            0 :             default:
     915            0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
     916              :         }
     917              :     }
     918       148330 : }
     919              : 
     920              : /*
     921              :  * Write relation attribute metadata to the stream.
     922              :  */
     923              : static void
     924          394 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
     925              :                        PublishGencolsType include_gencols_type)
     926              : {
     927              :     TupleDesc   desc;
     928              :     int         i;
     929          394 :     uint16      nliveatts = 0;
     930          394 :     Bitmapset  *idattrs = NULL;
     931              :     bool        replidentfull;
     932              : 
     933          394 :     desc = RelationGetDescr(rel);
     934              : 
     935              :     /* send number of live attributes */
     936         1221 :     for (i = 0; i < desc->natts; i++)
     937              :     {
     938          827 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     939              : 
     940          827 :         if (!logicalrep_should_publish_column(att, columns,
     941              :                                               include_gencols_type))
     942           71 :             continue;
     943              : 
     944          756 :         nliveatts++;
     945              :     }
     946          394 :     pq_sendint16(out, nliveatts);
     947              : 
     948              :     /* fetch bitmap of REPLICATION IDENTITY attributes */
     949          394 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
     950          394 :     if (!replidentfull)
     951          336 :         idattrs = RelationGetIdentityKeyBitmap(rel);
     952              : 
     953              :     /* send the attributes */
     954         1221 :     for (i = 0; i < desc->natts; i++)
     955              :     {
     956          827 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     957          827 :         uint8       flags = 0;
     958              : 
     959          827 :         if (!logicalrep_should_publish_column(att, columns,
     960              :                                               include_gencols_type))
     961           71 :             continue;
     962              : 
     963              :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
     964         1424 :         if (replidentfull ||
     965          668 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     966              :                           idattrs))
     967          359 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
     968              : 
     969          756 :         pq_sendbyte(out, flags);
     970              : 
     971              :         /* attribute name */
     972          756 :         pq_sendstring(out, NameStr(att->attname));
     973              : 
     974              :         /* attribute type id */
     975          756 :         pq_sendint32(out, (int) att->atttypid);
     976              : 
     977              :         /* attribute mode */
     978          756 :         pq_sendint32(out, att->atttypmod);
     979              :     }
     980              : 
     981          394 :     bms_free(idattrs);
     982          394 : }
     983              : 
     984              : /*
     985              :  * Read relation attribute metadata from the stream.
     986              :  */
     987              : static void
     988          454 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
     989              : {
     990              :     int         i;
     991              :     int         natts;
     992              :     char      **attnames;
     993              :     Oid        *atttyps;
     994          454 :     Bitmapset  *attkeys = NULL;
     995              : 
     996          454 :     natts = pq_getmsgint(in, 2);
     997          454 :     attnames = palloc_array(char *, natts);
     998          454 :     atttyps = palloc_array(Oid, natts);
     999              : 
    1000              :     /* read the attributes */
    1001         1313 :     for (i = 0; i < natts; i++)
    1002              :     {
    1003              :         uint8       flags;
    1004              : 
    1005              :         /* Check for replica identity column */
    1006          859 :         flags = pq_getmsgbyte(in);
    1007          859 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
    1008          401 :             attkeys = bms_add_member(attkeys, i);
    1009              : 
    1010              :         /* attribute name */
    1011          859 :         attnames[i] = pstrdup(pq_getmsgstring(in));
    1012              : 
    1013              :         /* attribute type id */
    1014          859 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
    1015              : 
    1016              :         /* we ignore attribute mode for now */
    1017          859 :         (void) pq_getmsgint(in, 4);
    1018              :     }
    1019              : 
    1020          454 :     rel->attnames = attnames;
    1021          454 :     rel->atttyps = atttyps;
    1022          454 :     rel->attkeys = attkeys;
    1023          454 :     rel->natts = natts;
    1024          454 : }
    1025              : 
    1026              : /*
    1027              :  * Write the namespace name or empty string for pg_catalog (to save space).
    1028              :  */
    1029              : static void
    1030          412 : logicalrep_write_namespace(StringInfo out, Oid nspid)
    1031              : {
    1032          412 :     if (nspid == PG_CATALOG_NAMESPACE)
    1033            1 :         pq_sendbyte(out, '\0');
    1034              :     else
    1035              :     {
    1036          411 :         char       *nspname = get_namespace_name(nspid);
    1037              : 
    1038          411 :         if (nspname == NULL)
    1039            0 :             elog(ERROR, "cache lookup failed for namespace %u",
    1040              :                  nspid);
    1041              : 
    1042          411 :         pq_sendstring(out, nspname);
    1043              :     }
    1044          412 : }
    1045              : 
    1046              : /*
    1047              :  * Read the namespace name while treating empty string as pg_catalog.
    1048              :  */
    1049              : static const char *
    1050          472 : logicalrep_read_namespace(StringInfo in)
    1051              : {
    1052          472 :     const char *nspname = pq_getmsgstring(in);
    1053              : 
    1054          472 :     if (nspname[0] == '\0')
    1055            1 :         nspname = "pg_catalog";
    1056              : 
    1057          472 :     return nspname;
    1058              : }
    1059              : 
    1060              : /*
    1061              :  * Write the information for the start stream message to the output stream.
    1062              :  */
    1063              : void
    1064          638 : logicalrep_write_stream_start(StringInfo out,
    1065              :                               TransactionId xid, bool first_segment)
    1066              : {
    1067          638 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
    1068              : 
    1069              :     Assert(TransactionIdIsValid(xid));
    1070              : 
    1071              :     /* transaction ID (we're starting to stream, so must be valid) */
    1072          638 :     pq_sendint32(out, xid);
    1073              : 
    1074              :     /* 1 if this is the first streaming segment for this xid */
    1075          638 :     pq_sendbyte(out, first_segment ? 1 : 0);
    1076          638 : }
    1077              : 
    1078              : /*
    1079              :  * Read the information about the start stream message from output stream.
    1080              :  */
    1081              : TransactionId
    1082          842 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
    1083              : {
    1084              :     TransactionId xid;
    1085              : 
    1086              :     Assert(first_segment);
    1087              : 
    1088          842 :     xid = pq_getmsgint(in, 4);
    1089          842 :     *first_segment = (pq_getmsgbyte(in) == 1);
    1090              : 
    1091          842 :     return xid;
    1092              : }
    1093              : 
    1094              : /*
    1095              :  * Write the stop stream message to the output stream.
    1096              :  */
    1097              : void
    1098          638 : logicalrep_write_stream_stop(StringInfo out)
    1099              : {
    1100          638 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
    1101          638 : }
    1102              : 
    1103              : /*
    1104              :  * Write STREAM COMMIT to the output stream.
    1105              :  */
    1106              : void
    1107           45 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
    1108              :                                XLogRecPtr commit_lsn)
    1109              : {
    1110           45 :     uint8       flags = 0;
    1111              : 
    1112           45 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
    1113              : 
    1114              :     Assert(TransactionIdIsValid(txn->xid));
    1115              : 
    1116              :     /* transaction ID */
    1117           45 :     pq_sendint32(out, txn->xid);
    1118              : 
    1119              :     /* send the flags field (unused for now) */
    1120           45 :     pq_sendbyte(out, flags);
    1121              : 
    1122              :     /* send fields */
    1123           45 :     pq_sendint64(out, commit_lsn);
    1124           45 :     pq_sendint64(out, txn->end_lsn);
    1125           45 :     pq_sendint64(out, txn->commit_time);
    1126           45 : }
    1127              : 
    1128              : /*
    1129              :  * Read STREAM COMMIT from the output stream.
    1130              :  */
    1131              : TransactionId
    1132           61 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
    1133              : {
    1134              :     TransactionId xid;
    1135              :     uint8       flags;
    1136              : 
    1137           61 :     xid = pq_getmsgint(in, 4);
    1138              : 
    1139              :     /* read flags (unused for now) */
    1140           61 :     flags = pq_getmsgbyte(in);
    1141              : 
    1142           61 :     if (flags != 0)
    1143            0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
    1144              : 
    1145              :     /* read fields */
    1146           61 :     commit_data->commit_lsn = pq_getmsgint64(in);
    1147           61 :     commit_data->end_lsn = pq_getmsgint64(in);
    1148           61 :     commit_data->committime = pq_getmsgint64(in);
    1149              : 
    1150           61 :     return xid;
    1151              : }
    1152              : 
    1153              : /*
    1154              :  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
    1155              :  * same for the top-level transaction abort.
    1156              :  *
    1157              :  * If write_abort_info is true, send the abort_lsn and abort_time fields,
    1158              :  * otherwise don't.
    1159              :  */
    1160              : void
    1161           26 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
    1162              :                               TransactionId subxid, XLogRecPtr abort_lsn,
    1163              :                               TimestampTz abort_time, bool write_abort_info)
    1164              : {
    1165           26 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
    1166              : 
    1167              :     Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
    1168              : 
    1169              :     /* transaction ID */
    1170           26 :     pq_sendint32(out, xid);
    1171           26 :     pq_sendint32(out, subxid);
    1172              : 
    1173           26 :     if (write_abort_info)
    1174              :     {
    1175           12 :         pq_sendint64(out, abort_lsn);
    1176           12 :         pq_sendint64(out, abort_time);
    1177              :     }
    1178           26 : }
    1179              : 
    1180              : /*
    1181              :  * Read STREAM ABORT from the output stream.
    1182              :  *
    1183              :  * If read_abort_info is true, read the abort_lsn and abort_time fields,
    1184              :  * otherwise don't.
    1185              :  */
    1186              : void
    1187           38 : logicalrep_read_stream_abort(StringInfo in,
    1188              :                              LogicalRepStreamAbortData *abort_data,
    1189              :                              bool read_abort_info)
    1190              : {
    1191              :     Assert(abort_data);
    1192              : 
    1193           38 :     abort_data->xid = pq_getmsgint(in, 4);
    1194           38 :     abort_data->subxid = pq_getmsgint(in, 4);
    1195              : 
    1196           38 :     if (read_abort_info)
    1197              :     {
    1198           24 :         abort_data->abort_lsn = pq_getmsgint64(in);
    1199           24 :         abort_data->abort_time = pq_getmsgint64(in);
    1200              :     }
    1201              :     else
    1202              :     {
    1203           14 :         abort_data->abort_lsn = InvalidXLogRecPtr;
    1204           14 :         abort_data->abort_time = 0;
    1205              :     }
    1206           38 : }
    1207              : 
    1208              : /*
    1209              :  * Get string representing LogicalRepMsgType.
    1210              :  */
    1211              : const char *
    1212          876 : logicalrep_message_type(LogicalRepMsgType action)
    1213              : {
    1214              :     static char err_unknown[20];
    1215              : 
    1216          876 :     switch (action)
    1217              :     {
    1218            1 :         case LOGICAL_REP_MSG_BEGIN:
    1219            1 :             return "BEGIN";
    1220            7 :         case LOGICAL_REP_MSG_COMMIT:
    1221            7 :             return "COMMIT";
    1222            0 :         case LOGICAL_REP_MSG_ORIGIN:
    1223            0 :             return "ORIGIN";
    1224          532 :         case LOGICAL_REP_MSG_INSERT:
    1225          532 :             return "INSERT";
    1226           23 :         case LOGICAL_REP_MSG_UPDATE:
    1227           23 :             return "UPDATE";
    1228           14 :         case LOGICAL_REP_MSG_DELETE:
    1229           14 :             return "DELETE";
    1230            0 :         case LOGICAL_REP_MSG_TRUNCATE:
    1231            0 :             return "TRUNCATE";
    1232            2 :         case LOGICAL_REP_MSG_RELATION:
    1233            2 :             return "RELATION";
    1234            0 :         case LOGICAL_REP_MSG_TYPE:
    1235            0 :             return "TYPE";
    1236            0 :         case LOGICAL_REP_MSG_MESSAGE:
    1237            0 :             return "MESSAGE";
    1238            1 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    1239            1 :             return "BEGIN PREPARE";
    1240            2 :         case LOGICAL_REP_MSG_PREPARE:
    1241            2 :             return "PREPARE";
    1242            0 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    1243            0 :             return "COMMIT PREPARED";
    1244            0 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    1245            0 :             return "ROLLBACK PREPARED";
    1246           13 :         case LOGICAL_REP_MSG_STREAM_START:
    1247           13 :             return "STREAM START";
    1248          235 :         case LOGICAL_REP_MSG_STREAM_STOP:
    1249          235 :             return "STREAM STOP";
    1250           23 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    1251           23 :             return "STREAM COMMIT";
    1252           19 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    1253           19 :             return "STREAM ABORT";
    1254            4 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    1255            4 :             return "STREAM PREPARE";
    1256              :     }
    1257              : 
    1258              :     /*
    1259              :      * This message provides context in the error raised when applying a
    1260              :      * logical message. So we can't throw an error here. Return an unknown
    1261              :      * indicator value so that the original error is still reported.
    1262              :      */
    1263            0 :     snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
    1264              : 
    1265            0 :     return err_unknown;
    1266              : }
    1267              : 
    1268              : /*
    1269              :  * Check if the column 'att' of a table should be published.
    1270              :  *
    1271              :  * 'columns' represents the publication column list (if any) for that table.
    1272              :  *
    1273              :  * 'include_gencols_type' value indicates whether generated columns should be
    1274              :  * published when there is no column list. Typically, this will have the same
    1275              :  * value as the 'publish_generated_columns' publication parameter.
    1276              :  *
    1277              :  * Note that generated columns can be published only when present in a
    1278              :  * publication column list, or when include_gencols_type is
    1279              :  * PUBLISH_GENCOLS_STORED.
    1280              :  */
    1281              : bool
    1282       735103 : logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
    1283              :                                  PublishGencolsType include_gencols_type)
    1284              : {
    1285       735103 :     if (att->attisdropped)
    1286           51 :         return false;
    1287              : 
    1288              :     /* If a column list is provided, publish only the cols in that list. */
    1289       735052 :     if (columns)
    1290          949 :         return bms_is_member(att->attnum, columns);
    1291              : 
    1292              :     /* All non-generated columns are always published. */
    1293       734103 :     if (!att->attgenerated)
    1294       734046 :         return true;
    1295              : 
    1296              :     /*
    1297              :      * Stored generated columns are only published when the user sets
    1298              :      * publish_generated_columns as stored.
    1299              :      */
    1300           57 :     if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
    1301           33 :         return include_gencols_type == PUBLISH_GENCOLS_STORED;
    1302              : 
    1303           24 :     return false;
    1304              : }
        

Generated by: LCOV version 2.0-1