LCOV - code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 234 255 91.8 %
Date: 2020-05-25 03:07:04 Functions: 22 24 91.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * proto.c
       4             :  *      logical replication protocol functions
       5             :  *
       6             :  * Copyright (c) 2015-2020, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *      src/backend/replication/logical/proto.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/sysattr.h"
      16             : #include "catalog/pg_namespace.h"
      17             : #include "catalog/pg_type.h"
      18             : #include "libpq/pqformat.h"
      19             : #include "replication/logicalproto.h"
      20             : #include "utils/builtins.h"
      21             : #include "utils/lsyscache.h"
      22             : #include "utils/syscache.h"
      23             : 
      24             : /*
      25             :  * Protocol message flags.
      26             :  */
      27             : #define LOGICALREP_IS_REPLICA_IDENTITY 1
      28             : 
      29             : #define TRUNCATE_CASCADE        (1<<0)
      30             : #define TRUNCATE_RESTART_SEQS   (1<<1)
      31             : 
      32             : static void logicalrep_write_attrs(StringInfo out, Relation rel);
      33             : static void logicalrep_write_tuple(StringInfo out, Relation rel,
      34             :                                    HeapTuple tuple);
      35             : 
      36             : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
      37             : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
      38             : 
      39             : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
      40             : static const char *logicalrep_read_namespace(StringInfo in);
      41             : 
      42             : /*
      43             :  * Write BEGIN to the output stream.
      44             :  */
      45             : void
      46         308 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
      47             : {
      48         308 :     pq_sendbyte(out, 'B');      /* BEGIN */
      49             : 
      50             :     /* fixed fields */
      51         308 :     pq_sendint64(out, txn->final_lsn);
      52         308 :     pq_sendint64(out, txn->commit_time);
      53         308 :     pq_sendint32(out, txn->xid);
      54         308 : }
      55             : 
      56             : /*
      57             :  * Read transaction BEGIN from the stream.
      58             :  */
      59             : void
      60         462 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
      61             : {
      62             :     /* read fields */
      63         462 :     begin_data->final_lsn = pq_getmsgint64(in);
      64         462 :     if (begin_data->final_lsn == InvalidXLogRecPtr)
      65           0 :         elog(ERROR, "final_lsn not set in begin message");
      66         462 :     begin_data->committime = pq_getmsgint64(in);
      67         462 :     begin_data->xid = pq_getmsgint(in, 4);
      68         462 : }
      69             : 
      70             : 
      71             : /*
      72             :  * Write COMMIT to the output stream.
      73             :  */
      74             : void
      75         308 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
      76             :                         XLogRecPtr commit_lsn)
      77             : {
      78         308 :     uint8       flags = 0;
      79             : 
      80         308 :     pq_sendbyte(out, 'C');      /* sending COMMIT */
      81             : 
      82             :     /* send the flags field (unused for now) */
      83         308 :     pq_sendbyte(out, flags);
      84             : 
      85             :     /* send fields */
      86         308 :     pq_sendint64(out, commit_lsn);
      87         308 :     pq_sendint64(out, txn->end_lsn);
      88         308 :     pq_sendint64(out, txn->commit_time);
      89         308 : }
      90             : 
      91             : /*
      92             :  * Read transaction COMMIT from the stream.
      93             :  */
      94             : void
      95         460 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
      96             : {
      97             :     /* read flags (unused for now) */
      98         460 :     uint8       flags = pq_getmsgbyte(in);
      99             : 
     100         460 :     if (flags != 0)
     101           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
     102             : 
     103             :     /* read fields */
     104         460 :     commit_data->commit_lsn = pq_getmsgint64(in);
     105         460 :     commit_data->end_lsn = pq_getmsgint64(in);
     106         460 :     commit_data->committime = pq_getmsgint64(in);
     107         460 : }
     108             : 
     109             : /*
     110             :  * Write ORIGIN to the output stream.
     111             :  */
     112             : void
     113           0 : logicalrep_write_origin(StringInfo out, const char *origin,
     114             :                         XLogRecPtr origin_lsn)
     115             : {
     116           0 :     pq_sendbyte(out, 'O');      /* ORIGIN */
     117             : 
     118             :     /* fixed fields */
     119           0 :     pq_sendint64(out, origin_lsn);
     120             : 
     121             :     /* origin string */
     122           0 :     pq_sendstring(out, origin);
     123           0 : }
     124             : 
     125             : /*
     126             :  * Read ORIGIN from the output stream.
     127             :  */
     128             : char *
     129           0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
     130             : {
     131             :     /* fixed fields */
     132           0 :     *origin_lsn = pq_getmsgint64(in);
     133             : 
     134             :     /* return origin */
     135           0 :     return pstrdup(pq_getmsgstring(in));
     136             : }
     137             : 
     138             : /*
     139             :  * Write INSERT to the output stream.
     140             :  */
     141             : void
     142         782 : logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
     143             : {
     144         782 :     pq_sendbyte(out, 'I');      /* action INSERT */
     145             : 
     146             :     /* use Oid as relation identifier */
     147         782 :     pq_sendint32(out, RelationGetRelid(rel));
     148             : 
     149         782 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     150         782 :     logicalrep_write_tuple(out, rel, newtuple);
     151         782 : }
     152             : 
     153             : /*
     154             :  * Read INSERT from stream.
     155             :  *
     156             :  * Fills the new tuple.
     157             :  */
     158             : LogicalRepRelId
     159         860 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
     160             : {
     161             :     char        action;
     162             :     LogicalRepRelId relid;
     163             : 
     164             :     /* read the relation id */
     165         860 :     relid = pq_getmsgint(in, 4);
     166             : 
     167         860 :     action = pq_getmsgbyte(in);
     168         860 :     if (action != 'N')
     169           0 :         elog(ERROR, "expected new tuple but got %d",
     170             :              action);
     171             : 
     172         860 :     logicalrep_read_tuple(in, newtup);
     173             : 
     174         860 :     return relid;
     175             : }
     176             : 
     177             : /*
     178             :  * Write UPDATE to the output stream.
     179             :  */
     180             : void
     181         238 : logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
     182             :                         HeapTuple newtuple)
     183             : {
     184         238 :     pq_sendbyte(out, 'U');      /* action UPDATE */
     185             : 
     186             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     187             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     188             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     189             : 
     190             :     /* use Oid as relation identifier */
     191         238 :     pq_sendint32(out, RelationGetRelid(rel));
     192             : 
     193         238 :     if (oldtuple != NULL)
     194             :     {
     195         136 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     196          46 :             pq_sendbyte(out, 'O');  /* old tuple follows */
     197             :         else
     198          90 :             pq_sendbyte(out, 'K');  /* old key follows */
     199         136 :         logicalrep_write_tuple(out, rel, oldtuple);
     200             :     }
     201             : 
     202         238 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     203         238 :     logicalrep_write_tuple(out, rel, newtuple);
     204         238 : }
     205             : 
     206             : /*
     207             :  * Read UPDATE from stream.
     208             :  */
     209             : LogicalRepRelId
     210         262 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
     211             :                        LogicalRepTupleData *oldtup,
     212             :                        LogicalRepTupleData *newtup)
     213             : {
     214             :     char        action;
     215             :     LogicalRepRelId relid;
     216             : 
     217             :     /* read the relation id */
     218         262 :     relid = pq_getmsgint(in, 4);
     219             : 
     220             :     /* read and verify action */
     221         262 :     action = pq_getmsgbyte(in);
     222         262 :     if (action != 'K' && action != 'O' && action != 'N')
     223           0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
     224             :              action);
     225             : 
     226             :     /* check for old tuple */
     227         262 :     if (action == 'K' || action == 'O')
     228             :     {
     229         160 :         logicalrep_read_tuple(in, oldtup);
     230         160 :         *has_oldtuple = true;
     231             : 
     232         160 :         action = pq_getmsgbyte(in);
     233             :     }
     234             :     else
     235         102 :         *has_oldtuple = false;
     236             : 
     237             :     /* check for new  tuple */
     238         262 :     if (action != 'N')
     239           0 :         elog(ERROR, "expected action 'N', got %c",
     240             :              action);
     241             : 
     242         262 :     logicalrep_read_tuple(in, newtup);
     243             : 
     244         262 :     return relid;
     245             : }
     246             : 
     247             : /*
     248             :  * Write DELETE to the output stream.
     249             :  */
     250             : void
     251         394 : logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
     252             : {
     253             :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     254             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     255             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     256             : 
     257         394 :     pq_sendbyte(out, 'D');      /* action DELETE */
     258             : 
     259             :     /* use Oid as relation identifier */
     260         394 :     pq_sendint32(out, RelationGetRelid(rel));
     261             : 
     262         394 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     263         200 :         pq_sendbyte(out, 'O');  /* old tuple follows */
     264             :     else
     265         194 :         pq_sendbyte(out, 'K');  /* old key follows */
     266             : 
     267         394 :     logicalrep_write_tuple(out, rel, oldtuple);
     268         394 : }
     269             : 
     270             : /*
     271             :  * Read DELETE from stream.
     272             :  *
     273             :  * Fills the old tuple.
     274             :  */
     275             : LogicalRepRelId
     276         452 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
     277             : {
     278             :     char        action;
     279             :     LogicalRepRelId relid;
     280             : 
     281             :     /* read the relation id */
     282         452 :     relid = pq_getmsgint(in, 4);
     283             : 
     284             :     /* read and verify action */
     285         452 :     action = pq_getmsgbyte(in);
     286         452 :     if (action != 'K' && action != 'O')
     287           0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
     288             : 
     289         452 :     logicalrep_read_tuple(in, oldtup);
     290             : 
     291         452 :     return relid;
     292             : }
     293             : 
     294             : /*
     295             :  * Write TRUNCATE to the output stream.
     296             :  */
     297             : void
     298           6 : logicalrep_write_truncate(StringInfo out,
     299             :                           int nrelids,
     300             :                           Oid relids[],
     301             :                           bool cascade, bool restart_seqs)
     302             : {
     303             :     int         i;
     304           6 :     uint8       flags = 0;
     305             : 
     306           6 :     pq_sendbyte(out, 'T');      /* action TRUNCATE */
     307             : 
     308           6 :     pq_sendint32(out, nrelids);
     309             : 
     310             :     /* encode and send truncate flags */
     311           6 :     if (cascade)
     312           0 :         flags |= TRUNCATE_CASCADE;
     313           6 :     if (restart_seqs)
     314           0 :         flags |= TRUNCATE_RESTART_SEQS;
     315           6 :     pq_sendint8(out, flags);
     316             : 
     317          16 :     for (i = 0; i < nrelids; i++)
     318          10 :         pq_sendint32(out, relids[i]);
     319           6 : }
     320             : 
     321             : /*
     322             :  * Read TRUNCATE from stream.
     323             :  */
     324             : List *
     325          24 : logicalrep_read_truncate(StringInfo in,
     326             :                          bool *cascade, bool *restart_seqs)
     327             : {
     328             :     int         i;
     329             :     int         nrelids;
     330          24 :     List       *relids = NIL;
     331             :     uint8       flags;
     332             : 
     333          24 :     nrelids = pq_getmsgint(in, 4);
     334             : 
     335             :     /* read and decode truncate flags */
     336          24 :     flags = pq_getmsgint(in, 1);
     337          24 :     *cascade = (flags & TRUNCATE_CASCADE) > 0;
     338          24 :     *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
     339             : 
     340          64 :     for (i = 0; i < nrelids; i++)
     341          40 :         relids = lappend_oid(relids, pq_getmsgint(in, 4));
     342             : 
     343          24 :     return relids;
     344             : }
     345             : 
     346             : /*
     347             :  * Write relation description to the output stream.
     348             :  */
     349             : void
     350          98 : logicalrep_write_rel(StringInfo out, Relation rel)
     351             : {
     352             :     char       *relname;
     353             : 
     354          98 :     pq_sendbyte(out, 'R');      /* sending RELATION */
     355             : 
     356             :     /* use Oid as relation identifier */
     357          98 :     pq_sendint32(out, RelationGetRelid(rel));
     358             : 
     359             :     /* send qualified relation name */
     360          98 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
     361          98 :     relname = RelationGetRelationName(rel);
     362          98 :     pq_sendstring(out, relname);
     363             : 
     364             :     /* send replica identity */
     365          98 :     pq_sendbyte(out, rel->rd_rel->relreplident);
     366             : 
     367             :     /* send the attribute info */
     368          98 :     logicalrep_write_attrs(out, rel);
     369          98 : }
     370             : 
     371             : /*
     372             :  * Read the relation info from stream and return as LogicalRepRelation.
     373             :  */
     374             : LogicalRepRelation *
     375         170 : logicalrep_read_rel(StringInfo in)
     376             : {
     377         170 :     LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
     378             : 
     379         170 :     rel->remoteid = pq_getmsgint(in, 4);
     380             : 
     381             :     /* Read relation name from stream */
     382         170 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
     383         170 :     rel->relname = pstrdup(pq_getmsgstring(in));
     384             : 
     385             :     /* Read the replica identity. */
     386         170 :     rel->replident = pq_getmsgbyte(in);
     387             : 
     388             :     /* Get attribute description */
     389         170 :     logicalrep_read_attrs(in, rel);
     390             : 
     391         170 :     return rel;
     392             : }
     393             : 
     394             : /*
     395             :  * Write type info to the output stream.
     396             :  *
     397             :  * This function will always write base type info.
     398             :  */
     399             : void
     400          32 : logicalrep_write_typ(StringInfo out, Oid typoid)
     401             : {
     402          32 :     Oid         basetypoid = getBaseType(typoid);
     403             :     HeapTuple   tup;
     404             :     Form_pg_type typtup;
     405             : 
     406          32 :     pq_sendbyte(out, 'Y');      /* sending TYPE */
     407             : 
     408          32 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
     409          32 :     if (!HeapTupleIsValid(tup))
     410           0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
     411          32 :     typtup = (Form_pg_type) GETSTRUCT(tup);
     412             : 
     413             :     /* use Oid as relation identifier */
     414          32 :     pq_sendint32(out, typoid);
     415             : 
     416             :     /* send qualified type name */
     417          32 :     logicalrep_write_namespace(out, typtup->typnamespace);
     418          32 :     pq_sendstring(out, NameStr(typtup->typname));
     419             : 
     420          32 :     ReleaseSysCache(tup);
     421          32 : }
     422             : 
     423             : /*
     424             :  * Read type info from the output stream.
     425             :  */
     426             : void
     427          32 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
     428             : {
     429          32 :     ltyp->remoteid = pq_getmsgint(in, 4);
     430             : 
     431             :     /* Read type name from stream */
     432          32 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
     433          32 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
     434          32 : }
     435             : 
     436             : /*
     437             :  * Write a tuple to the outputstream, in the most efficient format possible.
     438             :  */
     439             : static void
     440        1550 : logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
     441             : {
     442             :     TupleDesc   desc;
     443             :     Datum       values[MaxTupleAttributeNumber];
     444             :     bool        isnull[MaxTupleAttributeNumber];
     445             :     int         i;
     446        1550 :     uint16      nliveatts = 0;
     447             : 
     448        1550 :     desc = RelationGetDescr(rel);
     449             : 
     450        3790 :     for (i = 0; i < desc->natts; i++)
     451             :     {
     452        2240 :         if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
     453           4 :             continue;
     454        2236 :         nliveatts++;
     455             :     }
     456        1550 :     pq_sendint16(out, nliveatts);
     457             : 
     458             :     /* try to allocate enough memory from the get-go */
     459        3100 :     enlargeStringInfo(out, tuple->t_len +
     460        1550 :                       nliveatts * (1 + 4));
     461             : 
     462        1550 :     heap_deform_tuple(tuple, desc, values, isnull);
     463             : 
     464             :     /* Write the values */
     465        3790 :     for (i = 0; i < desc->natts; i++)
     466             :     {
     467             :         HeapTuple   typtup;
     468             :         Form_pg_type typclass;
     469        2240 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     470             :         char       *outputstr;
     471             : 
     472        2240 :         if (att->attisdropped || att->attgenerated)
     473           4 :             continue;
     474             : 
     475        2236 :         if (isnull[i])
     476             :         {
     477         368 :             pq_sendbyte(out, 'n');  /* null column */
     478         368 :             continue;
     479             :         }
     480        1868 :         else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
     481             :         {
     482           2 :             pq_sendbyte(out, 'u');  /* unchanged toast column */
     483           2 :             continue;
     484             :         }
     485             : 
     486        1866 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
     487        1866 :         if (!HeapTupleIsValid(typtup))
     488           0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
     489        1866 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
     490             : 
     491        1866 :         pq_sendbyte(out, 't');  /* 'text' data follows */
     492             : 
     493        1866 :         outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
     494        1866 :         pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
     495        1866 :         pfree(outputstr);
     496             : 
     497        1866 :         ReleaseSysCache(typtup);
     498             :     }
     499        1550 : }
     500             : 
     501             : /*
     502             :  * Read tuple in remote format from stream.
     503             :  *
     504             :  * The returned tuple points into the input stringinfo.
     505             :  */
     506             : static void
     507        1734 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
     508             : {
     509             :     int         i;
     510             :     int         natts;
     511             : 
     512             :     /* Get number of attributes */
     513        1734 :     natts = pq_getmsgint(in, 2);
     514             : 
     515        1734 :     memset(tuple->changed, 0, sizeof(tuple->changed));
     516             : 
     517             :     /* Read the data */
     518        4330 :     for (i = 0; i < natts; i++)
     519             :     {
     520             :         char        kind;
     521             : 
     522        2596 :         kind = pq_getmsgbyte(in);
     523             : 
     524        2596 :         switch (kind)
     525             :         {
     526         546 :             case 'n':           /* null */
     527         546 :                 tuple->values[i] = NULL;
     528         546 :                 tuple->changed[i] = true;
     529         546 :                 break;
     530           2 :             case 'u':           /* unchanged column */
     531             :                 /* we don't receive the value of an unchanged column */
     532           2 :                 tuple->values[i] = NULL;
     533           2 :                 break;
     534        2048 :             case 't':           /* text formatted value */
     535             :                 {
     536             :                     int         len;
     537             : 
     538        2048 :                     tuple->changed[i] = true;
     539             : 
     540        2048 :                     len = pq_getmsgint(in, 4);  /* read length */
     541             : 
     542             :                     /* and data */
     543        2048 :                     tuple->values[i] = palloc(len + 1);
     544        2048 :                     pq_copymsgbytes(in, tuple->values[i], len);
     545        2048 :                     tuple->values[i][len] = '\0';
     546             :                 }
     547        2048 :                 break;
     548           0 :             default:
     549           0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
     550             :         }
     551             :     }
     552        1734 : }
     553             : 
     554             : /*
     555             :  * Write relation attributes to the stream.
     556             :  */
     557             : static void
     558          98 : logicalrep_write_attrs(StringInfo out, Relation rel)
     559             : {
     560             :     TupleDesc   desc;
     561             :     int         i;
     562          98 :     uint16      nliveatts = 0;
     563          98 :     Bitmapset  *idattrs = NULL;
     564             :     bool        replidentfull;
     565             : 
     566          98 :     desc = RelationGetDescr(rel);
     567             : 
     568             :     /* send number of live attributes */
     569         276 :     for (i = 0; i < desc->natts; i++)
     570             :     {
     571         178 :         if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
     572           2 :             continue;
     573         176 :         nliveatts++;
     574             :     }
     575          98 :     pq_sendint16(out, nliveatts);
     576             : 
     577             :     /* fetch bitmap of REPLICATION IDENTITY attributes */
     578          98 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
     579          98 :     if (!replidentfull)
     580          86 :         idattrs = RelationGetIndexAttrBitmap(rel,
     581             :                                              INDEX_ATTR_BITMAP_IDENTITY_KEY);
     582             : 
     583             :     /* send the attributes */
     584         276 :     for (i = 0; i < desc->natts; i++)
     585             :     {
     586         178 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     587         178 :         uint8       flags = 0;
     588             : 
     589         178 :         if (att->attisdropped || att->attgenerated)
     590           2 :             continue;
     591             : 
     592             :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
     593         338 :         if (replidentfull ||
     594         162 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     595             :                           idattrs))
     596          80 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
     597             : 
     598         176 :         pq_sendbyte(out, flags);
     599             : 
     600             :         /* attribute name */
     601         176 :         pq_sendstring(out, NameStr(att->attname));
     602             : 
     603             :         /* attribute type id */
     604         176 :         pq_sendint32(out, (int) att->atttypid);
     605             : 
     606             :         /* attribute mode */
     607         176 :         pq_sendint32(out, att->atttypmod);
     608             :     }
     609             : 
     610          98 :     bms_free(idattrs);
     611          98 : }
     612             : 
     613             : /*
     614             :  * Read relation attribute names from the stream.
     615             :  */
     616             : static void
     617         170 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
     618             : {
     619             :     int         i;
     620             :     int         natts;
     621             :     char      **attnames;
     622             :     Oid        *atttyps;
     623         170 :     Bitmapset  *attkeys = NULL;
     624             : 
     625         170 :     natts = pq_getmsgint(in, 2);
     626         170 :     attnames = palloc(natts * sizeof(char *));
     627         170 :     atttyps = palloc(natts * sizeof(Oid));
     628             : 
     629             :     /* read the attributes */
     630         480 :     for (i = 0; i < natts; i++)
     631             :     {
     632             :         uint8       flags;
     633             : 
     634             :         /* Check for replica identity column */
     635         310 :         flags = pq_getmsgbyte(in);
     636         310 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
     637         152 :             attkeys = bms_add_member(attkeys, i);
     638             : 
     639             :         /* attribute name */
     640         310 :         attnames[i] = pstrdup(pq_getmsgstring(in));
     641             : 
     642             :         /* attribute type id */
     643         310 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
     644             : 
     645             :         /* we ignore attribute mode for now */
     646         310 :         (void) pq_getmsgint(in, 4);
     647             :     }
     648             : 
     649         170 :     rel->attnames = attnames;
     650         170 :     rel->atttyps = atttyps;
     651         170 :     rel->attkeys = attkeys;
     652         170 :     rel->natts = natts;
     653         170 : }
     654             : 
     655             : /*
     656             :  * Write the namespace name or empty string for pg_catalog (to save space).
     657             :  */
     658             : static void
     659         130 : logicalrep_write_namespace(StringInfo out, Oid nspid)
     660             : {
     661         130 :     if (nspid == PG_CATALOG_NAMESPACE)
     662           2 :         pq_sendbyte(out, '\0');
     663             :     else
     664             :     {
     665         128 :         char       *nspname = get_namespace_name(nspid);
     666             : 
     667         128 :         if (nspname == NULL)
     668           0 :             elog(ERROR, "cache lookup failed for namespace %u",
     669             :                  nspid);
     670             : 
     671         128 :         pq_sendstring(out, nspname);
     672             :     }
     673         130 : }
     674             : 
     675             : /*
     676             :  * Read the namespace name while treating empty string as pg_catalog.
     677             :  */
     678             : static const char *
     679         202 : logicalrep_read_namespace(StringInfo in)
     680             : {
     681         202 :     const char *nspname = pq_getmsgstring(in);
     682             : 
     683         202 :     if (nspname[0] == '\0')
     684           2 :         nspname = "pg_catalog";
     685             : 
     686         202 :     return nspname;
     687             : }

Generated by: LCOV version 1.13