LCOV - code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 223 251 88.8 %
Date: 2019-11-21 12:06:29 Functions: 22 24 91.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * proto.c
       4             :  *      logical replication protocol functions
       5             :  *
       6             :  * Copyright (c) 2015-2019, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *      src/backend/replication/logical/proto.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/sysattr.h"
      16             : #include "catalog/pg_namespace.h"
      17             : #include "catalog/pg_type.h"
      18             : #include "libpq/pqformat.h"
      19             : #include "replication/logicalproto.h"
      20             : #include "utils/builtins.h"
      21             : #include "utils/lsyscache.h"
      22             : #include "utils/syscache.h"
      23             : 
      24             : /*
      25             :  * Protocol message flags.
      26             :  */
      27             : #define LOGICALREP_IS_REPLICA_IDENTITY 1
      28             : 
      29             : #define TRUNCATE_CASCADE        (1<<0)
      30             : #define TRUNCATE_RESTART_SEQS   (1<<1)
      31             : 
      32             : static void logicalrep_write_attrs(StringInfo out, Relation rel);
      33             : static void logicalrep_write_tuple(StringInfo out, Relation rel,
      34             :                                    HeapTuple tuple);
      35             : 
      36             : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
      37             : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
      38             : 
      39             : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
      40             : static const char *logicalrep_read_namespace(StringInfo in);
      41             : 
      42             : /*
      43             :  * Write BEGIN to the output stream.
      44             :  */
      45             : void
      46         238 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
      47             : {
      48         238 :     pq_sendbyte(out, 'B');      /* BEGIN */
      49             : 
      50             :     /* fixed fields */
      51         238 :     pq_sendint64(out, txn->final_lsn);
      52         238 :     pq_sendint64(out, txn->commit_time);
      53         238 :     pq_sendint32(out, txn->xid);
      54         238 : }
      55             : 
      56             : /*
      57             :  * Read transaction BEGIN from the stream.
      58             :  */
      59             : void
      60         274 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
      61             : {
      62             :     /* read fields */
      63         274 :     begin_data->final_lsn = pq_getmsgint64(in);
      64         274 :     if (begin_data->final_lsn == InvalidXLogRecPtr)
      65           0 :         elog(ERROR, "final_lsn not set in begin message");
      66         274 :     begin_data->committime = pq_getmsgint64(in);
      67         274 :     begin_data->xid = pq_getmsgint(in, 4);
      68         274 : }
      69             : 
      70             : 
      71             : /*
      72             :  * Write COMMIT to the output stream.
      73             :  */
      74             : void
      75         238 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
      76             :                         XLogRecPtr commit_lsn)
      77             : {
      78         238 :     uint8       flags = 0;
      79             : 
      80         238 :     pq_sendbyte(out, 'C');      /* sending COMMIT */
      81             : 
      82             :     /* send the flags field (unused for now) */
      83         238 :     pq_sendbyte(out, flags);
      84             : 
      85             :     /* send fields */
      86         238 :     pq_sendint64(out, commit_lsn);
      87         238 :     pq_sendint64(out, txn->end_lsn);
      88         238 :     pq_sendint64(out, txn->commit_time);
      89         238 : }
      90             : 
      91             : /*
      92             :  * Read transaction COMMIT from the stream.
      93             :  */
      94             : void
      95         272 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
      96             : {
      97             :     /* read flags (unused for now) */
      98         272 :     uint8       flags = pq_getmsgbyte(in);
      99             : 
     100         272 :     if (flags != 0)
     101           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
     102             : 
     103             :     /* read fields */
     104         272 :     commit_data->commit_lsn = pq_getmsgint64(in);
     105         272 :     commit_data->end_lsn = pq_getmsgint64(in);
     106         272 :     commit_data->committime = pq_getmsgint64(in);
     107         272 : }
     108             : 
     109             : /*
     110             :  * Write ORIGIN to the output stream.
     111             :  */
     112             : void
     113           0 : logicalrep_write_origin(StringInfo out, const char *origin,
     114             :                         XLogRecPtr origin_lsn)
     115             : {
     116           0 :     pq_sendbyte(out, 'O');      /* ORIGIN */
     117             : 
     118             :     /* fixed fields */
     119           0 :     pq_sendint64(out, origin_lsn);
     120             : 
     121             :     /* origin string */
     122           0 :     pq_sendstring(out, origin);
     123           0 : }
     124             : 
     125             : /*
     126             :  * Read ORIGIN from the output stream.
     127             :  */
     128             : char *
     129           0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
     130             : {
     131             :     /* fixed fields */
     132           0 :     *origin_lsn = pq_getmsgint64(in);
     133             : 
     134             :     /* return origin */
     135           0 :     return pstrdup(pq_getmsgstring(in));
     136             : }
     137             : 
     138             : /*
     139             :  * Write INSERT to the output stream.
     140             :  */
     141             : void
     142         728 : logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
     143             : {
     144         728 :     pq_sendbyte(out, 'I');      /* action INSERT */
     145             : 
     146             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     147             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     148             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     149             : 
     150             :     /* use Oid as relation identifier */
     151         728 :     pq_sendint32(out, RelationGetRelid(rel));
     152             : 
     153         728 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     154         728 :     logicalrep_write_tuple(out, rel, newtuple);
     155         728 : }
     156             : 
     157             : /*
     158             :  * Read INSERT from stream.
     159             :  *
     160             :  * Fills the new tuple.
     161             :  */
     162             : LogicalRepRelId
     163         730 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
     164             : {
     165             :     char        action;
     166             :     LogicalRepRelId relid;
     167             : 
     168             :     /* read the relation id */
     169         730 :     relid = pq_getmsgint(in, 4);
     170             : 
     171         730 :     action = pq_getmsgbyte(in);
     172         730 :     if (action != 'N')
     173           0 :         elog(ERROR, "expected new tuple but got %d",
     174             :              action);
     175             : 
     176         730 :     logicalrep_read_tuple(in, newtup);
     177             : 
     178         730 :     return relid;
     179             : }
     180             : 
     181             : /*
     182             :  * Write UPDATE to the output stream.
     183             :  */
     184             : void
     185         212 : logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
     186             :                         HeapTuple newtuple)
     187             : {
     188         212 :     pq_sendbyte(out, 'U');      /* action UPDATE */
     189             : 
     190             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     191             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     192             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     193             : 
     194             :     /* use Oid as relation identifier */
     195         212 :     pq_sendint32(out, RelationGetRelid(rel));
     196             : 
     197         212 :     if (oldtuple != NULL)
     198             :     {
     199         124 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     200          44 :             pq_sendbyte(out, 'O');  /* old tuple follows */
     201             :         else
     202          80 :             pq_sendbyte(out, 'K');  /* old key follows */
     203         124 :         logicalrep_write_tuple(out, rel, oldtuple);
     204             :     }
     205             : 
     206         212 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     207         212 :     logicalrep_write_tuple(out, rel, newtuple);
     208         212 : }
     209             : 
     210             : /*
     211             :  * Read UPDATE from stream.
     212             :  */
     213             : LogicalRepRelId
     214         214 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
     215             :                        LogicalRepTupleData *oldtup,
     216             :                        LogicalRepTupleData *newtup)
     217             : {
     218             :     char        action;
     219             :     LogicalRepRelId relid;
     220             : 
     221             :     /* read the relation id */
     222         214 :     relid = pq_getmsgint(in, 4);
     223             : 
     224             :     /* read and verify action */
     225         214 :     action = pq_getmsgbyte(in);
     226         214 :     if (action != 'K' && action != 'O' && action != 'N')
     227           0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
     228             :              action);
     229             : 
     230             :     /* check for old tuple */
     231         214 :     if (action == 'K' || action == 'O')
     232             :     {
     233         126 :         logicalrep_read_tuple(in, oldtup);
     234         126 :         *has_oldtuple = true;
     235             : 
     236         126 :         action = pq_getmsgbyte(in);
     237             :     }
     238             :     else
     239          88 :         *has_oldtuple = false;
     240             : 
     241             :     /* check for new  tuple */
     242         214 :     if (action != 'N')
     243           0 :         elog(ERROR, "expected action 'N', got %c",
     244             :              action);
     245             : 
     246         214 :     logicalrep_read_tuple(in, newtup);
     247             : 
     248         214 :     return relid;
     249             : }
     250             : 
     251             : /*
     252             :  * Write DELETE to the output stream.
     253             :  */
     254             : void
     255         382 : logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
     256             : {
     257             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     258             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     259             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     260             : 
     261         382 :     pq_sendbyte(out, 'D');      /* action DELETE */
     262             : 
     263             :     /* use Oid as relation identifier */
     264         382 :     pq_sendint32(out, RelationGetRelid(rel));
     265             : 
     266         382 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     267         200 :         pq_sendbyte(out, 'O');  /* old tuple follows */
     268             :     else
     269         182 :         pq_sendbyte(out, 'K');  /* old key follows */
     270             : 
     271         382 :     logicalrep_write_tuple(out, rel, oldtuple);
     272         382 : }
     273             : 
     274             : /*
     275             :  * Read DELETE from stream.
     276             :  *
     277             :  * Fills the old tuple.
     278             :  */
     279             : LogicalRepRelId
     280         382 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
     281             : {
     282             :     char        action;
     283             :     LogicalRepRelId relid;
     284             : 
     285             :     /* read the relation id */
     286         382 :     relid = pq_getmsgint(in, 4);
     287             : 
     288             :     /* read and verify action */
     289         382 :     action = pq_getmsgbyte(in);
     290         382 :     if (action != 'K' && action != 'O')
     291           0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
     292             : 
     293         382 :     logicalrep_read_tuple(in, oldtup);
     294             : 
     295         382 :     return relid;
     296             : }
     297             : 
     298             : /*
     299             :  * Write TRUNCATE to the output stream.
     300             :  */
     301             : void
     302           2 : logicalrep_write_truncate(StringInfo out,
     303             :                           int nrelids,
     304             :                           Oid relids[],
     305             :                           bool cascade, bool restart_seqs)
     306             : {
     307             :     int         i;
     308           2 :     uint8       flags = 0;
     309             : 
     310           2 :     pq_sendbyte(out, 'T');      /* action TRUNCATE */
     311             : 
     312           2 :     pq_sendint32(out, nrelids);
     313             : 
     314             :     /* encode and send truncate flags */
     315           2 :     if (cascade)
     316           0 :         flags |= TRUNCATE_CASCADE;
     317           2 :     if (restart_seqs)
     318           0 :         flags |= TRUNCATE_RESTART_SEQS;
     319           2 :     pq_sendint8(out, flags);
     320             : 
     321           4 :     for (i = 0; i < nrelids; i++)
     322           2 :         pq_sendint32(out, relids[i]);
     323           2 : }
     324             : 
     325             : /*
     326             :  * Read TRUNCATE from stream.
     327             :  */
     328             : List *
     329          10 : logicalrep_read_truncate(StringInfo in,
     330             :                          bool *cascade, bool *restart_seqs)
     331             : {
     332             :     int         i;
     333             :     int         nrelids;
     334          10 :     List       *relids = NIL;
     335             :     uint8       flags;
     336             : 
     337          10 :     nrelids = pq_getmsgint(in, 4);
     338             : 
     339             :     /* read and decode truncate flags */
     340          10 :     flags = pq_getmsgint(in, 1);
     341          10 :     *cascade = (flags & TRUNCATE_CASCADE) > 0;
     342          10 :     *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
     343             : 
     344          22 :     for (i = 0; i < nrelids; i++)
     345          12 :         relids = lappend_oid(relids, pq_getmsgint(in, 4));
     346             : 
     347          10 :     return relids;
     348             : }
     349             : 
     350             : /*
     351             :  * Write relation description to the output stream.
     352             :  */
     353             : void
     354          78 : logicalrep_write_rel(StringInfo out, Relation rel)
     355             : {
     356             :     char       *relname;
     357             : 
     358          78 :     pq_sendbyte(out, 'R');      /* sending RELATION */
     359             : 
     360             :     /* use Oid as relation identifier */
     361          78 :     pq_sendint32(out, RelationGetRelid(rel));
     362             : 
     363             :     /* send qualified relation name */
     364          78 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
     365          78 :     relname = RelationGetRelationName(rel);
     366          78 :     pq_sendstring(out, relname);
     367             : 
     368             :     /* send replica identity */
     369          78 :     pq_sendbyte(out, rel->rd_rel->relreplident);
     370             : 
     371             :     /* send the attribute info */
     372          78 :     logicalrep_write_attrs(out, rel);
     373          78 : }
     374             : 
     375             : /*
     376             :  * Read the relation info from stream and return as LogicalRepRelation.
     377             :  */
     378             : LogicalRepRelation *
     379          90 : logicalrep_read_rel(StringInfo in)
     380             : {
     381          90 :     LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
     382             : 
     383          90 :     rel->remoteid = pq_getmsgint(in, 4);
     384             : 
     385             :     /* Read relation name from stream */
     386          90 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
     387          90 :     rel->relname = pstrdup(pq_getmsgstring(in));
     388             : 
     389             :     /* Read the replica identity. */
     390          90 :     rel->replident = pq_getmsgbyte(in);
     391             : 
     392             :     /* Get attribute description */
     393          90 :     logicalrep_read_attrs(in, rel);
     394             : 
     395          90 :     return rel;
     396             : }
     397             : 
     398             : /*
     399             :  * Write type info to the output stream.
     400             :  *
     401             :  * This function will always write base type info.
     402             :  */
     403             : void
     404          32 : logicalrep_write_typ(StringInfo out, Oid typoid)
     405             : {
     406          32 :     Oid         basetypoid = getBaseType(typoid);
     407             :     HeapTuple   tup;
     408             :     Form_pg_type typtup;
     409             : 
     410          32 :     pq_sendbyte(out, 'Y');      /* sending TYPE */
     411             : 
     412          32 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
     413          32 :     if (!HeapTupleIsValid(tup))
     414           0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
     415          32 :     typtup = (Form_pg_type) GETSTRUCT(tup);
     416             : 
     417             :     /* use Oid as relation identifier */
     418          32 :     pq_sendint32(out, typoid);
     419             : 
     420             :     /* send qualified type name */
     421          32 :     logicalrep_write_namespace(out, typtup->typnamespace);
     422          32 :     pq_sendstring(out, NameStr(typtup->typname));
     423             : 
     424          32 :     ReleaseSysCache(tup);
     425          32 : }
     426             : 
     427             : /*
     428             :  * Read type info from the output stream.
     429             :  */
     430             : void
     431          32 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
     432             : {
     433          32 :     ltyp->remoteid = pq_getmsgint(in, 4);
     434             : 
     435             :     /* Read type name from stream */
     436          32 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
     437          32 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
     438          32 : }
     439             : 
     440             : /*
     441             :  * Write a tuple to the outputstream, in the most efficient format possible.
     442             :  */
     443             : static void
     444        1446 : logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
     445             : {
     446             :     TupleDesc   desc;
     447             :     Datum       values[MaxTupleAttributeNumber];
     448             :     bool        isnull[MaxTupleAttributeNumber];
     449             :     int         i;
     450        1446 :     uint16      nliveatts = 0;
     451             : 
     452        1446 :     desc = RelationGetDescr(rel);
     453             : 
     454        3504 :     for (i = 0; i < desc->natts; i++)
     455             :     {
     456        2058 :         if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
     457           0 :             continue;
     458        2058 :         nliveatts++;
     459             :     }
     460        1446 :     pq_sendint16(out, nliveatts);
     461             : 
     462             :     /* try to allocate enough memory from the get-go */
     463        2892 :     enlargeStringInfo(out, tuple->t_len +
     464        1446 :                       nliveatts * (1 + 4));
     465             : 
     466        1446 :     heap_deform_tuple(tuple, desc, values, isnull);
     467             : 
     468             :     /* Write the values */
     469        3504 :     for (i = 0; i < desc->natts; i++)
     470             :     {
     471             :         HeapTuple   typtup;
     472             :         Form_pg_type typclass;
     473        2058 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     474             :         char       *outputstr;
     475             : 
     476        2058 :         if (att->attisdropped || att->attgenerated)
     477           0 :             continue;
     478             : 
     479        2058 :         if (isnull[i])
     480             :         {
     481         326 :             pq_sendbyte(out, 'n');  /* null column */
     482         326 :             continue;
     483             :         }
     484        1732 :         else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
     485             :         {
     486           0 :             pq_sendbyte(out, 'u');  /* unchanged toast column */
     487           0 :             continue;
     488             :         }
     489             : 
     490        1732 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
     491        1732 :         if (!HeapTupleIsValid(typtup))
     492           0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
     493        1732 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
     494             : 
     495        1732 :         pq_sendbyte(out, 't');  /* 'text' data follows */
     496             : 
     497        1732 :         outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
     498        1732 :         pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
     499        1732 :         pfree(outputstr);
     500             : 
     501        1732 :         ReleaseSysCache(typtup);
     502             :     }
     503        1446 : }
     504             : 
     505             : /*
     506             :  * Read tuple in remote format from stream.
     507             :  *
     508             :  * The returned tuple points into the input stringinfo.
     509             :  */
     510             : static void
     511        1452 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
     512             : {
     513             :     int         i;
     514             :     int         natts;
     515             : 
     516             :     /* Get number of attributes */
     517        1452 :     natts = pq_getmsgint(in, 2);
     518             : 
     519        1452 :     memset(tuple->changed, 0, sizeof(tuple->changed));
     520             : 
     521             :     /* Read the data */
     522        3514 :     for (i = 0; i < natts; i++)
     523             :     {
     524             :         char        kind;
     525             : 
     526        2062 :         kind = pq_getmsgbyte(in);
     527             : 
     528        2062 :         switch (kind)
     529             :         {
     530             :             case 'n':           /* null */
     531         326 :                 tuple->values[i] = NULL;
     532         326 :                 tuple->changed[i] = true;
     533         326 :                 break;
     534             :             case 'u':           /* unchanged column */
     535             :                 /* we don't receive the value of an unchanged column */
     536           0 :                 tuple->values[i] = NULL;
     537           0 :                 break;
     538             :             case 't':           /* text formatted value */
     539             :                 {
     540             :                     int         len;
     541             : 
     542        1736 :                     tuple->changed[i] = true;
     543             : 
     544        1736 :                     len = pq_getmsgint(in, 4);  /* read length */
     545             : 
     546             :                     /* and data */
     547        1736 :                     tuple->values[i] = palloc(len + 1);
     548        1736 :                     pq_copymsgbytes(in, tuple->values[i], len);
     549        1736 :                     tuple->values[i][len] = '\0';
     550             :                 }
     551        1736 :                 break;
     552             :             default:
     553           0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
     554             :         }
     555             :     }
     556        1452 : }
     557             : 
     558             : /*
     559             :  * Write relation attributes to the stream.
     560             :  */
     561             : static void
     562          78 : logicalrep_write_attrs(StringInfo out, Relation rel)
     563             : {
     564             :     TupleDesc   desc;
     565             :     int         i;
     566          78 :     uint16      nliveatts = 0;
     567          78 :     Bitmapset  *idattrs = NULL;
     568             :     bool        replidentfull;
     569             : 
     570          78 :     desc = RelationGetDescr(rel);
     571             : 
     572             :     /* send number of live attributes */
     573         214 :     for (i = 0; i < desc->natts; i++)
     574             :     {
     575         136 :         if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
     576           0 :             continue;
     577         136 :         nliveatts++;
     578             :     }
     579          78 :     pq_sendint16(out, nliveatts);
     580             : 
     581             :     /* fetch bitmap of REPLICATION IDENTITY attributes */
     582          78 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
     583          78 :     if (!replidentfull)
     584          68 :         idattrs = RelationGetIndexAttrBitmap(rel,
     585             :                                              INDEX_ATTR_BITMAP_IDENTITY_KEY);
     586             : 
     587             :     /* send the attributes */
     588         214 :     for (i = 0; i < desc->natts; i++)
     589             :     {
     590         136 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     591         136 :         uint8       flags = 0;
     592             : 
     593         136 :         if (att->attisdropped || att->attgenerated)
     594           0 :             continue;
     595             : 
     596             :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
     597         262 :         if (replidentfull ||
     598         126 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     599             :                           idattrs))
     600          60 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
     601             : 
     602         136 :         pq_sendbyte(out, flags);
     603             : 
     604             :         /* attribute name */
     605         136 :         pq_sendstring(out, NameStr(att->attname));
     606             : 
     607             :         /* attribute type id */
     608         136 :         pq_sendint32(out, (int) att->atttypid);
     609             : 
     610             :         /* attribute mode */
     611         136 :         pq_sendint32(out, att->atttypmod);
     612             :     }
     613             : 
     614          78 :     bms_free(idattrs);
     615          78 : }
     616             : 
     617             : /*
     618             :  * Read relation attribute names from the stream.
     619             :  */
     620             : static void
     621          90 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
     622             : {
     623             :     int         i;
     624             :     int         natts;
     625             :     char      **attnames;
     626             :     Oid        *atttyps;
     627          90 :     Bitmapset  *attkeys = NULL;
     628             : 
     629          90 :     natts = pq_getmsgint(in, 2);
     630          90 :     attnames = palloc(natts * sizeof(char *));
     631          90 :     atttyps = palloc(natts * sizeof(Oid));
     632             : 
     633             :     /* read the attributes */
     634         240 :     for (i = 0; i < natts; i++)
     635             :     {
     636             :         uint8       flags;
     637             : 
     638             :         /* Check for replica identity column */
     639         150 :         flags = pq_getmsgbyte(in);
     640         150 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
     641          72 :             attkeys = bms_add_member(attkeys, i);
     642             : 
     643             :         /* attribute name */
     644         150 :         attnames[i] = pstrdup(pq_getmsgstring(in));
     645             : 
     646             :         /* attribute type id */
     647         150 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
     648             : 
     649             :         /* we ignore attribute mode for now */
     650         150 :         (void) pq_getmsgint(in, 4);
     651             :     }
     652             : 
     653          90 :     rel->attnames = attnames;
     654          90 :     rel->atttyps = atttyps;
     655          90 :     rel->attkeys = attkeys;
     656          90 :     rel->natts = natts;
     657          90 : }
     658             : 
     659             : /*
     660             :  * Write the namespace name or empty string for pg_catalog (to save space).
     661             :  */
     662             : static void
     663         110 : logicalrep_write_namespace(StringInfo out, Oid nspid)
     664             : {
     665         110 :     if (nspid == PG_CATALOG_NAMESPACE)
     666           2 :         pq_sendbyte(out, '\0');
     667             :     else
     668             :     {
     669         108 :         char       *nspname = get_namespace_name(nspid);
     670             : 
     671         108 :         if (nspname == NULL)
     672           0 :             elog(ERROR, "cache lookup failed for namespace %u",
     673             :                  nspid);
     674             : 
     675         108 :         pq_sendstring(out, nspname);
     676             :     }
     677         110 : }
     678             : 
     679             : /*
     680             :  * Read the namespace name while treating empty string as pg_catalog.
     681             :  */
     682             : static const char *
     683         122 : logicalrep_read_namespace(StringInfo in)
     684             : {
     685         122 :     const char *nspname = pq_getmsgstring(in);
     686             : 
     687         122 :     if (nspname[0] == '\0')
     688           2 :         nspname = "pg_catalog";
     689             : 
     690         122 :     return nspname;
     691             : }

Generated by: LCOV version 1.13