Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * proto.c
4 : * logical replication protocol functions
5 : *
6 : * Copyright (c) 2015-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/proto.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/sysattr.h"
16 : #include "catalog/pg_namespace.h"
17 : #include "catalog/pg_type.h"
18 : #include "libpq/pqformat.h"
19 : #include "replication/logicalproto.h"
20 : #include "utils/lsyscache.h"
21 : #include "utils/syscache.h"
22 :
23 : /*
24 : * Protocol message flags.
25 : */
26 : #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 :
28 : #define MESSAGE_TRANSACTIONAL (1<<0)
29 : #define TRUNCATE_CASCADE (1<<0)
30 : #define TRUNCATE_RESTART_SEQS (1<<1)
31 :
32 : static void logicalrep_write_attrs(StringInfo out, Relation rel,
33 : Bitmapset *columns,
34 : PublishGencolsType include_gencols_type);
35 : static void logicalrep_write_tuple(StringInfo out, Relation rel,
36 : TupleTableSlot *slot,
37 : bool binary, Bitmapset *columns,
38 : PublishGencolsType include_gencols_type);
39 : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
40 : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
41 :
42 : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
43 : static const char *logicalrep_read_namespace(StringInfo in);
44 :
45 : /*
46 : * Write BEGIN to the output stream.
47 : */
48 : void
49 896 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
50 : {
51 896 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
52 :
53 : /* fixed fields */
54 896 : pq_sendint64(out, txn->final_lsn);
55 896 : pq_sendint64(out, txn->commit_time);
56 896 : pq_sendint32(out, txn->xid);
57 896 : }
58 :
59 : /*
60 : * Read transaction BEGIN from the stream.
61 : */
62 : void
63 960 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
64 : {
65 : /* read fields */
66 960 : begin_data->final_lsn = pq_getmsgint64(in);
67 960 : if (!XLogRecPtrIsValid(begin_data->final_lsn))
68 0 : elog(ERROR, "final_lsn not set in begin message");
69 960 : begin_data->committime = pq_getmsgint64(in);
70 960 : begin_data->xid = pq_getmsgint(in, 4);
71 960 : }
72 :
73 :
74 : /*
75 : * Write COMMIT to the output stream.
76 : */
77 : void
78 894 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
79 : XLogRecPtr commit_lsn)
80 : {
81 894 : uint8 flags = 0;
82 :
83 894 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
84 :
85 : /* send the flags field (unused for now) */
86 894 : pq_sendbyte(out, flags);
87 :
88 : /* send fields */
89 894 : pq_sendint64(out, commit_lsn);
90 894 : pq_sendint64(out, txn->end_lsn);
91 894 : pq_sendint64(out, txn->commit_time);
92 894 : }
93 :
94 : /*
95 : * Read transaction COMMIT from the stream.
96 : */
97 : void
98 876 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
99 : {
100 : /* read flags (unused for now) */
101 876 : uint8 flags = pq_getmsgbyte(in);
102 :
103 876 : if (flags != 0)
104 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
105 :
106 : /* read fields */
107 876 : commit_data->commit_lsn = pq_getmsgint64(in);
108 876 : commit_data->end_lsn = pq_getmsgint64(in);
109 876 : commit_data->committime = pq_getmsgint64(in);
110 876 : }
111 :
112 : /*
113 : * Write BEGIN PREPARE to the output stream.
114 : */
115 : void
116 34 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
117 : {
118 34 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
119 :
120 : /* fixed fields */
121 34 : pq_sendint64(out, txn->final_lsn);
122 34 : pq_sendint64(out, txn->end_lsn);
123 34 : pq_sendint64(out, txn->prepare_time);
124 34 : pq_sendint32(out, txn->xid);
125 :
126 : /* send gid */
127 34 : pq_sendstring(out, txn->gid);
128 34 : }
129 :
130 : /*
131 : * Read transaction BEGIN PREPARE from the stream.
132 : */
133 : void
134 32 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
135 : {
136 : /* read fields */
137 32 : begin_data->prepare_lsn = pq_getmsgint64(in);
138 32 : if (!XLogRecPtrIsValid(begin_data->prepare_lsn))
139 0 : elog(ERROR, "prepare_lsn not set in begin prepare message");
140 32 : begin_data->end_lsn = pq_getmsgint64(in);
141 32 : if (!XLogRecPtrIsValid(begin_data->end_lsn))
142 0 : elog(ERROR, "end_lsn not set in begin prepare message");
143 32 : begin_data->prepare_time = pq_getmsgint64(in);
144 32 : begin_data->xid = pq_getmsgint(in, 4);
145 :
146 : /* read gid (copy it into a pre-allocated buffer) */
147 32 : strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
148 32 : }
149 :
150 : /*
151 : * The core functionality for logicalrep_write_prepare and
152 : * logicalrep_write_stream_prepare.
153 : */
154 : static void
155 62 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
156 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
157 : {
158 62 : uint8 flags = 0;
159 :
160 62 : pq_sendbyte(out, type);
161 :
162 : /*
163 : * This should only ever happen for two-phase commit transactions, in
164 : * which case we expect to have a valid GID.
165 : */
166 : Assert(txn->gid != NULL);
167 : Assert(rbtxn_is_prepared(txn));
168 : Assert(TransactionIdIsValid(txn->xid));
169 :
170 : /* send the flags field */
171 62 : pq_sendbyte(out, flags);
172 :
173 : /* send fields */
174 62 : pq_sendint64(out, prepare_lsn);
175 62 : pq_sendint64(out, txn->end_lsn);
176 62 : pq_sendint64(out, txn->prepare_time);
177 62 : pq_sendint32(out, txn->xid);
178 :
179 : /* send gid */
180 62 : pq_sendstring(out, txn->gid);
181 62 : }
182 :
183 : /*
184 : * Write PREPARE to the output stream.
185 : */
186 : void
187 34 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
188 : XLogRecPtr prepare_lsn)
189 : {
190 34 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
191 : txn, prepare_lsn);
192 34 : }
193 :
194 : /*
195 : * The core functionality for logicalrep_read_prepare and
196 : * logicalrep_read_stream_prepare.
197 : */
198 : static void
199 52 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
200 : LogicalRepPreparedTxnData *prepare_data)
201 : {
202 : /* read flags */
203 52 : uint8 flags = pq_getmsgbyte(in);
204 :
205 52 : if (flags != 0)
206 0 : elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
207 :
208 : /* read fields */
209 52 : prepare_data->prepare_lsn = pq_getmsgint64(in);
210 52 : if (!XLogRecPtrIsValid(prepare_data->prepare_lsn))
211 0 : elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
212 52 : prepare_data->end_lsn = pq_getmsgint64(in);
213 52 : if (!XLogRecPtrIsValid(prepare_data->end_lsn))
214 0 : elog(ERROR, "end_lsn is not set in %s message", msgtype);
215 52 : prepare_data->prepare_time = pq_getmsgint64(in);
216 52 : prepare_data->xid = pq_getmsgint(in, 4);
217 52 : if (prepare_data->xid == InvalidTransactionId)
218 0 : elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
219 :
220 : /* read gid (copy it into a pre-allocated buffer) */
221 52 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
222 52 : }
223 :
224 : /*
225 : * Read transaction PREPARE from the stream.
226 : */
227 : void
228 30 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
229 : {
230 30 : logicalrep_read_prepare_common(in, "prepare", prepare_data);
231 30 : }
232 :
233 : /*
234 : * Write COMMIT PREPARED to the output stream.
235 : */
236 : void
237 48 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
238 : XLogRecPtr commit_lsn)
239 : {
240 48 : uint8 flags = 0;
241 :
242 48 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
243 :
244 : /*
245 : * This should only ever happen for two-phase commit transactions, in
246 : * which case we expect to have a valid GID.
247 : */
248 : Assert(txn->gid != NULL);
249 :
250 : /* send the flags field */
251 48 : pq_sendbyte(out, flags);
252 :
253 : /* send fields */
254 48 : pq_sendint64(out, commit_lsn);
255 48 : pq_sendint64(out, txn->end_lsn);
256 48 : pq_sendint64(out, txn->commit_time);
257 48 : pq_sendint32(out, txn->xid);
258 :
259 : /* send gid */
260 48 : pq_sendstring(out, txn->gid);
261 48 : }
262 :
263 : /*
264 : * Read transaction COMMIT PREPARED from the stream.
265 : */
266 : void
267 40 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
268 : {
269 : /* read flags */
270 40 : uint8 flags = pq_getmsgbyte(in);
271 :
272 40 : if (flags != 0)
273 0 : elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274 :
275 : /* read fields */
276 40 : prepare_data->commit_lsn = pq_getmsgint64(in);
277 40 : if (!XLogRecPtrIsValid(prepare_data->commit_lsn))
278 0 : elog(ERROR, "commit_lsn is not set in commit prepared message");
279 40 : prepare_data->end_lsn = pq_getmsgint64(in);
280 40 : if (!XLogRecPtrIsValid(prepare_data->end_lsn))
281 0 : elog(ERROR, "end_lsn is not set in commit prepared message");
282 40 : prepare_data->commit_time = pq_getmsgint64(in);
283 40 : prepare_data->xid = pq_getmsgint(in, 4);
284 :
285 : /* read gid (copy it into a pre-allocated buffer) */
286 40 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
287 40 : }
288 :
289 : /*
290 : * Write ROLLBACK PREPARED to the output stream.
291 : */
292 : void
293 14 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
294 : XLogRecPtr prepare_end_lsn,
295 : TimestampTz prepare_time)
296 : {
297 14 : uint8 flags = 0;
298 :
299 14 : pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
300 :
301 : /*
302 : * This should only ever happen for two-phase commit transactions, in
303 : * which case we expect to have a valid GID.
304 : */
305 : Assert(txn->gid != NULL);
306 :
307 : /* send the flags field */
308 14 : pq_sendbyte(out, flags);
309 :
310 : /* send fields */
311 14 : pq_sendint64(out, prepare_end_lsn);
312 14 : pq_sendint64(out, txn->end_lsn);
313 14 : pq_sendint64(out, prepare_time);
314 14 : pq_sendint64(out, txn->commit_time);
315 14 : pq_sendint32(out, txn->xid);
316 :
317 : /* send gid */
318 14 : pq_sendstring(out, txn->gid);
319 14 : }
320 :
321 : /*
322 : * Read transaction ROLLBACK PREPARED from the stream.
323 : */
324 : void
325 10 : logicalrep_read_rollback_prepared(StringInfo in,
326 : LogicalRepRollbackPreparedTxnData *rollback_data)
327 : {
328 : /* read flags */
329 10 : uint8 flags = pq_getmsgbyte(in);
330 :
331 10 : if (flags != 0)
332 0 : elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333 :
334 : /* read fields */
335 10 : rollback_data->prepare_end_lsn = pq_getmsgint64(in);
336 10 : if (!XLogRecPtrIsValid(rollback_data->prepare_end_lsn))
337 0 : elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
338 10 : rollback_data->rollback_end_lsn = pq_getmsgint64(in);
339 10 : if (!XLogRecPtrIsValid(rollback_data->rollback_end_lsn))
340 0 : elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
341 10 : rollback_data->prepare_time = pq_getmsgint64(in);
342 10 : rollback_data->rollback_time = pq_getmsgint64(in);
343 10 : rollback_data->xid = pq_getmsgint(in, 4);
344 :
345 : /* read gid (copy it into a pre-allocated buffer) */
346 10 : strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
347 10 : }
348 :
349 : /*
350 : * Write STREAM PREPARE to the output stream.
351 : */
352 : void
353 28 : logicalrep_write_stream_prepare(StringInfo out,
354 : ReorderBufferTXN *txn,
355 : XLogRecPtr prepare_lsn)
356 : {
357 28 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
358 : txn, prepare_lsn);
359 28 : }
360 :
361 : /*
362 : * Read STREAM PREPARE from the stream.
363 : */
364 : void
365 22 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
366 : {
367 22 : logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
368 22 : }
369 :
370 : /*
371 : * Write ORIGIN to the output stream.
372 : */
373 : void
374 20 : logicalrep_write_origin(StringInfo out, const char *origin,
375 : XLogRecPtr origin_lsn)
376 : {
377 20 : pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
378 :
379 : /* fixed fields */
380 20 : pq_sendint64(out, origin_lsn);
381 :
382 : /* origin string */
383 20 : pq_sendstring(out, origin);
384 20 : }
385 :
386 : /*
387 : * Read ORIGIN from the output stream.
388 : */
389 : char *
390 0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
391 : {
392 : /* fixed fields */
393 0 : *origin_lsn = pq_getmsgint64(in);
394 :
395 : /* return origin */
396 0 : return pstrdup(pq_getmsgstring(in));
397 : }
398 :
399 : /*
400 : * Write INSERT to the output stream.
401 : */
402 : void
403 211838 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
404 : TupleTableSlot *newslot, bool binary,
405 : Bitmapset *columns,
406 : PublishGencolsType include_gencols_type)
407 : {
408 211838 : pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
409 :
410 : /* transaction ID (if not valid, we're not streaming) */
411 211838 : if (TransactionIdIsValid(xid))
412 200168 : pq_sendint32(out, xid);
413 :
414 : /* use Oid as relation identifier */
415 211838 : pq_sendint32(out, RelationGetRelid(rel));
416 :
417 211838 : pq_sendbyte(out, 'N'); /* new tuple follows */
418 211838 : logicalrep_write_tuple(out, rel, newslot, binary, columns,
419 : include_gencols_type);
420 211838 : }
421 :
422 : /*
423 : * Read INSERT from stream.
424 : *
425 : * Fills the new tuple.
426 : */
427 : LogicalRepRelId
428 152188 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
429 : {
430 : char action;
431 : LogicalRepRelId relid;
432 :
433 : /* read the relation id */
434 152188 : relid = pq_getmsgint(in, 4);
435 :
436 152188 : action = pq_getmsgbyte(in);
437 152188 : if (action != 'N')
438 0 : elog(ERROR, "expected new tuple but got %d",
439 : action);
440 :
441 152188 : logicalrep_read_tuple(in, newtup);
442 :
443 152188 : return relid;
444 : }
445 :
446 : /*
447 : * Write UPDATE to the output stream.
448 : */
449 : void
450 68892 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
451 : TupleTableSlot *oldslot, TupleTableSlot *newslot,
452 : bool binary, Bitmapset *columns,
453 : PublishGencolsType include_gencols_type)
454 : {
455 68892 : pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
456 :
457 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460 :
461 : /* transaction ID (if not valid, we're not streaming) */
462 68892 : if (TransactionIdIsValid(xid))
463 68464 : pq_sendint32(out, xid);
464 :
465 : /* use Oid as relation identifier */
466 68892 : pq_sendint32(out, RelationGetRelid(rel));
467 :
468 68892 : if (oldslot != NULL)
469 : {
470 266 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
471 126 : pq_sendbyte(out, 'O'); /* old tuple follows */
472 : else
473 140 : pq_sendbyte(out, 'K'); /* old key follows */
474 266 : logicalrep_write_tuple(out, rel, oldslot, binary, columns,
475 : include_gencols_type);
476 : }
477 :
478 68892 : pq_sendbyte(out, 'N'); /* new tuple follows */
479 68892 : logicalrep_write_tuple(out, rel, newslot, binary, columns,
480 : include_gencols_type);
481 68892 : }
482 :
483 : /*
484 : * Read UPDATE from stream.
485 : */
486 : LogicalRepRelId
487 63880 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
488 : LogicalRepTupleData *oldtup,
489 : LogicalRepTupleData *newtup)
490 : {
491 : char action;
492 : LogicalRepRelId relid;
493 :
494 : /* read the relation id */
495 63880 : relid = pq_getmsgint(in, 4);
496 :
497 : /* read and verify action */
498 63880 : action = pq_getmsgbyte(in);
499 63880 : if (action != 'K' && action != 'O' && action != 'N')
500 0 : elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
501 : action);
502 :
503 : /* check for old tuple */
504 63880 : if (action == 'K' || action == 'O')
505 : {
506 270 : logicalrep_read_tuple(in, oldtup);
507 270 : *has_oldtuple = true;
508 :
509 270 : action = pq_getmsgbyte(in);
510 : }
511 : else
512 63610 : *has_oldtuple = false;
513 :
514 : /* check for new tuple */
515 63880 : if (action != 'N')
516 0 : elog(ERROR, "expected action 'N', got %c",
517 : action);
518 :
519 63880 : logicalrep_read_tuple(in, newtup);
520 :
521 63880 : return relid;
522 : }
523 :
524 : /*
525 : * Write DELETE to the output stream.
526 : */
527 : void
528 83772 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
529 : TupleTableSlot *oldslot, bool binary,
530 : Bitmapset *columns,
531 : PublishGencolsType include_gencols_type)
532 : {
533 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536 :
537 83772 : pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
538 :
539 : /* transaction ID (if not valid, we're not streaming) */
540 83772 : if (TransactionIdIsValid(xid))
541 83250 : pq_sendint32(out, xid);
542 :
543 : /* use Oid as relation identifier */
544 83772 : pq_sendint32(out, RelationGetRelid(rel));
545 :
546 83772 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547 258 : pq_sendbyte(out, 'O'); /* old tuple follows */
548 : else
549 83514 : pq_sendbyte(out, 'K'); /* old key follows */
550 :
551 83772 : logicalrep_write_tuple(out, rel, oldslot, binary, columns,
552 : include_gencols_type);
553 83772 : }
554 :
555 : /*
556 : * Read DELETE from stream.
557 : *
558 : * Fills the old tuple.
559 : */
560 : LogicalRepRelId
561 80642 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
562 : {
563 : char action;
564 : LogicalRepRelId relid;
565 :
566 : /* read the relation id */
567 80642 : relid = pq_getmsgint(in, 4);
568 :
569 : /* read and verify action */
570 80642 : action = pq_getmsgbyte(in);
571 80642 : if (action != 'K' && action != 'O')
572 0 : elog(ERROR, "expected action 'O' or 'K', got %c", action);
573 :
574 80642 : logicalrep_read_tuple(in, oldtup);
575 :
576 80642 : return relid;
577 : }
578 :
579 : /*
580 : * Write TRUNCATE to the output stream.
581 : */
582 : void
583 22 : logicalrep_write_truncate(StringInfo out,
584 : TransactionId xid,
585 : int nrelids,
586 : Oid relids[],
587 : bool cascade, bool restart_seqs)
588 : {
589 : int i;
590 22 : uint8 flags = 0;
591 :
592 22 : pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
593 :
594 : /* transaction ID (if not valid, we're not streaming) */
595 22 : if (TransactionIdIsValid(xid))
596 0 : pq_sendint32(out, xid);
597 :
598 22 : pq_sendint32(out, nrelids);
599 :
600 : /* encode and send truncate flags */
601 22 : if (cascade)
602 0 : flags |= TRUNCATE_CASCADE;
603 22 : if (restart_seqs)
604 0 : flags |= TRUNCATE_RESTART_SEQS;
605 22 : pq_sendint8(out, flags);
606 :
607 56 : for (i = 0; i < nrelids; i++)
608 34 : pq_sendint32(out, relids[i]);
609 22 : }
610 :
611 : /*
612 : * Read TRUNCATE from stream.
613 : */
614 : List *
615 40 : logicalrep_read_truncate(StringInfo in,
616 : bool *cascade, bool *restart_seqs)
617 : {
618 : int i;
619 : int nrelids;
620 40 : List *relids = NIL;
621 : uint8 flags;
622 :
623 40 : nrelids = pq_getmsgint(in, 4);
624 :
625 : /* read and decode truncate flags */
626 40 : flags = pq_getmsgint(in, 1);
627 40 : *cascade = (flags & TRUNCATE_CASCADE) > 0;
628 40 : *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
629 :
630 98 : for (i = 0; i < nrelids; i++)
631 58 : relids = lappend_oid(relids, pq_getmsgint(in, 4));
632 :
633 40 : return relids;
634 : }
635 :
636 : /*
637 : * Write MESSAGE to stream
638 : */
639 : void
640 10 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
641 : bool transactional, const char *prefix, Size sz,
642 : const char *message)
643 : {
644 10 : uint8 flags = 0;
645 :
646 10 : pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
647 :
648 : /* encode and send message flags */
649 10 : if (transactional)
650 4 : flags |= MESSAGE_TRANSACTIONAL;
651 :
652 : /* transaction ID (if not valid, we're not streaming) */
653 10 : if (TransactionIdIsValid(xid))
654 0 : pq_sendint32(out, xid);
655 :
656 10 : pq_sendint8(out, flags);
657 10 : pq_sendint64(out, lsn);
658 10 : pq_sendstring(out, prefix);
659 10 : pq_sendint32(out, sz);
660 10 : pq_sendbytes(out, message, sz);
661 10 : }
662 :
663 : /*
664 : * Write relation description to the output stream.
665 : */
666 : void
667 746 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
668 : Bitmapset *columns,
669 : PublishGencolsType include_gencols_type)
670 : {
671 : char *relname;
672 :
673 746 : pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
674 :
675 : /* transaction ID (if not valid, we're not streaming) */
676 746 : if (TransactionIdIsValid(xid))
677 144 : pq_sendint32(out, xid);
678 :
679 : /* use Oid as relation identifier */
680 746 : pq_sendint32(out, RelationGetRelid(rel));
681 :
682 : /* send qualified relation name */
683 746 : logicalrep_write_namespace(out, RelationGetNamespace(rel));
684 746 : relname = RelationGetRelationName(rel);
685 746 : pq_sendstring(out, relname);
686 :
687 : /* send replica identity */
688 746 : pq_sendbyte(out, rel->rd_rel->relreplident);
689 :
690 : /* send the attribute info */
691 746 : logicalrep_write_attrs(out, rel, columns, include_gencols_type);
692 746 : }
693 :
694 : /*
695 : * Read the relation info from stream and return as LogicalRepRelation.
696 : */
697 : LogicalRepRelation *
698 856 : logicalrep_read_rel(StringInfo in)
699 : {
700 856 : LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
701 :
702 856 : rel->remoteid = pq_getmsgint(in, 4);
703 :
704 : /* Read relation name from stream */
705 856 : rel->nspname = pstrdup(logicalrep_read_namespace(in));
706 856 : rel->relname = pstrdup(pq_getmsgstring(in));
707 :
708 : /* Read the replica identity. */
709 856 : rel->replident = pq_getmsgbyte(in);
710 :
711 : /* relkind is not sent */
712 856 : rel->relkind = 0;
713 :
714 : /* Get attribute description */
715 856 : logicalrep_read_attrs(in, rel);
716 :
717 856 : return rel;
718 : }
719 :
720 : /*
721 : * Write type info to the output stream.
722 : *
723 : * This function will always write base type info.
724 : */
725 : void
726 36 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
727 : {
728 36 : Oid basetypoid = getBaseType(typoid);
729 : HeapTuple tup;
730 : Form_pg_type typtup;
731 :
732 36 : pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
733 :
734 : /* transaction ID (if not valid, we're not streaming) */
735 36 : if (TransactionIdIsValid(xid))
736 0 : pq_sendint32(out, xid);
737 :
738 36 : tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
739 36 : if (!HeapTupleIsValid(tup))
740 0 : elog(ERROR, "cache lookup failed for type %u", basetypoid);
741 36 : typtup = (Form_pg_type) GETSTRUCT(tup);
742 :
743 : /* use Oid as type identifier */
744 36 : pq_sendint32(out, typoid);
745 :
746 : /* send qualified type name */
747 36 : logicalrep_write_namespace(out, typtup->typnamespace);
748 36 : pq_sendstring(out, NameStr(typtup->typname));
749 :
750 36 : ReleaseSysCache(tup);
751 36 : }
752 :
753 : /*
754 : * Read type info from the output stream.
755 : */
756 : void
757 36 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
758 : {
759 36 : ltyp->remoteid = pq_getmsgint(in, 4);
760 :
761 : /* Read type name from stream */
762 36 : ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
763 36 : ltyp->typname = pstrdup(pq_getmsgstring(in));
764 36 : }
765 :
766 : /*
767 : * Write a tuple to the outputstream, in the most efficient format possible.
768 : */
769 : static void
770 364768 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
771 : bool binary, Bitmapset *columns,
772 : PublishGencolsType include_gencols_type)
773 : {
774 : TupleDesc desc;
775 : Datum *values;
776 : bool *isnull;
777 : int i;
778 364768 : uint16 nliveatts = 0;
779 :
780 364768 : desc = RelationGetDescr(rel);
781 :
782 1097334 : for (i = 0; i < desc->natts; i++)
783 : {
784 732566 : Form_pg_attribute att = TupleDescAttr(desc, i);
785 :
786 732566 : if (!logicalrep_should_publish_column(att, columns,
787 : include_gencols_type))
788 282 : continue;
789 :
790 732284 : nliveatts++;
791 : }
792 364768 : pq_sendint16(out, nliveatts);
793 :
794 364768 : slot_getallattrs(slot);
795 364768 : values = slot->tts_values;
796 364768 : isnull = slot->tts_isnull;
797 :
798 : /* Write the values */
799 1097334 : for (i = 0; i < desc->natts; i++)
800 : {
801 : HeapTuple typtup;
802 : Form_pg_type typclass;
803 732566 : Form_pg_attribute att = TupleDescAttr(desc, i);
804 :
805 732566 : if (!logicalrep_should_publish_column(att, columns,
806 : include_gencols_type))
807 282 : continue;
808 :
809 732284 : if (isnull[i])
810 : {
811 103852 : pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
812 103852 : continue;
813 : }
814 :
815 628432 : if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(values[i])))
816 : {
817 : /*
818 : * Unchanged toasted datum. (Note that we don't promise to detect
819 : * unchanged data in general; this is just a cheap check to avoid
820 : * sending large values unnecessarily.)
821 : */
822 6 : pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
823 6 : continue;
824 : }
825 :
826 628426 : typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
827 628426 : if (!HeapTupleIsValid(typtup))
828 0 : elog(ERROR, "cache lookup failed for type %u", att->atttypid);
829 628426 : typclass = (Form_pg_type) GETSTRUCT(typtup);
830 :
831 : /*
832 : * Send in binary if requested and type has suitable send function.
833 : */
834 628426 : if (binary && OidIsValid(typclass->typsend))
835 230086 : {
836 : bytea *outputbytes;
837 : int len;
838 :
839 230086 : pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
840 230086 : outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
841 230086 : len = VARSIZE(outputbytes) - VARHDRSZ;
842 230086 : pq_sendint(out, len, 4); /* length */
843 230086 : pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
844 230086 : pfree(outputbytes);
845 : }
846 : else
847 : {
848 : char *outputstr;
849 :
850 398340 : pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
851 398340 : outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
852 398340 : pq_sendcountedtext(out, outputstr, strlen(outputstr));
853 398340 : pfree(outputstr);
854 : }
855 :
856 628426 : ReleaseSysCache(typtup);
857 : }
858 364768 : }
859 :
860 : /*
861 : * Read tuple in logical replication format from stream.
862 : */
863 : static void
864 296980 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
865 : {
866 : int i;
867 : int natts;
868 :
869 : /* Get number of attributes */
870 296980 : natts = pq_getmsgint(in, 2);
871 :
872 : /* Allocate space for per-column values; zero out unused StringInfoDatas */
873 296980 : tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
874 296980 : tuple->colstatus = (char *) palloc(natts * sizeof(char));
875 296980 : tuple->ncols = natts;
876 :
877 : /* Read the data */
878 903324 : for (i = 0; i < natts; i++)
879 : {
880 : char *buff;
881 : char kind;
882 : int len;
883 606344 : StringInfo value = &tuple->colvalues[i];
884 :
885 606344 : kind = pq_getmsgbyte(in);
886 606344 : tuple->colstatus[i] = kind;
887 :
888 606344 : switch (kind)
889 : {
890 100744 : case LOGICALREP_COLUMN_NULL:
891 : /* nothing more to do */
892 100744 : break;
893 6 : case LOGICALREP_COLUMN_UNCHANGED:
894 : /* we don't receive the value of an unchanged column */
895 6 : break;
896 505594 : case LOGICALREP_COLUMN_TEXT:
897 : case LOGICALREP_COLUMN_BINARY:
898 505594 : len = pq_getmsgint(in, 4); /* read length */
899 :
900 : /* and data */
901 505594 : buff = palloc(len + 1);
902 505594 : pq_copymsgbytes(in, buff, len);
903 :
904 : /*
905 : * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
906 : * as input functions require that. For
907 : * LOGICALREP_COLUMN_BINARY it's not technically required, but
908 : * it's harmless.
909 : */
910 505594 : buff[len] = '\0';
911 :
912 505594 : initStringInfoFromString(value, buff, len);
913 505594 : break;
914 0 : default:
915 0 : elog(ERROR, "unrecognized data representation type '%c'", kind);
916 : }
917 : }
918 296980 : }
919 :
920 : /*
921 : * Write relation attribute metadata to the stream.
922 : */
923 : static void
924 746 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
925 : PublishGencolsType include_gencols_type)
926 : {
927 : TupleDesc desc;
928 : int i;
929 746 : uint16 nliveatts = 0;
930 746 : Bitmapset *idattrs = NULL;
931 : bool replidentfull;
932 :
933 746 : desc = RelationGetDescr(rel);
934 :
935 : /* send number of live attributes */
936 2316 : for (i = 0; i < desc->natts; i++)
937 : {
938 1570 : Form_pg_attribute att = TupleDescAttr(desc, i);
939 :
940 1570 : if (!logicalrep_should_publish_column(att, columns,
941 : include_gencols_type))
942 142 : continue;
943 :
944 1428 : nliveatts++;
945 : }
946 746 : pq_sendint16(out, nliveatts);
947 :
948 : /* fetch bitmap of REPLICATION IDENTITY attributes */
949 746 : replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
950 746 : if (!replidentfull)
951 634 : idattrs = RelationGetIdentityKeyBitmap(rel);
952 :
953 : /* send the attributes */
954 2316 : for (i = 0; i < desc->natts; i++)
955 : {
956 1570 : Form_pg_attribute att = TupleDescAttr(desc, i);
957 1570 : uint8 flags = 0;
958 :
959 1570 : if (!logicalrep_should_publish_column(att, columns,
960 : include_gencols_type))
961 142 : continue;
962 :
963 : /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
964 2684 : if (replidentfull ||
965 1256 : bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
966 : idattrs))
967 678 : flags |= LOGICALREP_IS_REPLICA_IDENTITY;
968 :
969 1428 : pq_sendbyte(out, flags);
970 :
971 : /* attribute name */
972 1428 : pq_sendstring(out, NameStr(att->attname));
973 :
974 : /* attribute type id */
975 1428 : pq_sendint32(out, (int) att->atttypid);
976 :
977 : /* attribute mode */
978 1428 : pq_sendint32(out, att->atttypmod);
979 : }
980 :
981 746 : bms_free(idattrs);
982 746 : }
983 :
984 : /*
985 : * Read relation attribute metadata from the stream.
986 : */
987 : static void
988 856 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
989 : {
990 : int i;
991 : int natts;
992 : char **attnames;
993 : Oid *atttyps;
994 856 : Bitmapset *attkeys = NULL;
995 :
996 856 : natts = pq_getmsgint(in, 2);
997 856 : attnames = palloc(natts * sizeof(char *));
998 856 : atttyps = palloc(natts * sizeof(Oid));
999 :
1000 : /* read the attributes */
1001 2464 : for (i = 0; i < natts; i++)
1002 : {
1003 : uint8 flags;
1004 :
1005 : /* Check for replica identity column */
1006 1608 : flags = pq_getmsgbyte(in);
1007 1608 : if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
1008 764 : attkeys = bms_add_member(attkeys, i);
1009 :
1010 : /* attribute name */
1011 1608 : attnames[i] = pstrdup(pq_getmsgstring(in));
1012 :
1013 : /* attribute type id */
1014 1608 : atttyps[i] = (Oid) pq_getmsgint(in, 4);
1015 :
1016 : /* we ignore attribute mode for now */
1017 1608 : (void) pq_getmsgint(in, 4);
1018 : }
1019 :
1020 856 : rel->attnames = attnames;
1021 856 : rel->atttyps = atttyps;
1022 856 : rel->attkeys = attkeys;
1023 856 : rel->natts = natts;
1024 856 : }
1025 :
1026 : /*
1027 : * Write the namespace name or empty string for pg_catalog (to save space).
1028 : */
1029 : static void
1030 782 : logicalrep_write_namespace(StringInfo out, Oid nspid)
1031 : {
1032 782 : if (nspid == PG_CATALOG_NAMESPACE)
1033 2 : pq_sendbyte(out, '\0');
1034 : else
1035 : {
1036 780 : char *nspname = get_namespace_name(nspid);
1037 :
1038 780 : if (nspname == NULL)
1039 0 : elog(ERROR, "cache lookup failed for namespace %u",
1040 : nspid);
1041 :
1042 780 : pq_sendstring(out, nspname);
1043 : }
1044 782 : }
1045 :
1046 : /*
1047 : * Read the namespace name while treating empty string as pg_catalog.
1048 : */
1049 : static const char *
1050 892 : logicalrep_read_namespace(StringInfo in)
1051 : {
1052 892 : const char *nspname = pq_getmsgstring(in);
1053 :
1054 892 : if (nspname[0] == '\0')
1055 2 : nspname = "pg_catalog";
1056 :
1057 892 : return nspname;
1058 : }
1059 :
1060 : /*
1061 : * Write the information for the start stream message to the output stream.
1062 : */
1063 : void
1064 1276 : logicalrep_write_stream_start(StringInfo out,
1065 : TransactionId xid, bool first_segment)
1066 : {
1067 1276 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
1068 :
1069 : Assert(TransactionIdIsValid(xid));
1070 :
1071 : /* transaction ID (we're starting to stream, so must be valid) */
1072 1276 : pq_sendint32(out, xid);
1073 :
1074 : /* 1 if this is the first streaming segment for this xid */
1075 1276 : pq_sendbyte(out, first_segment ? 1 : 0);
1076 1276 : }
1077 :
1078 : /*
1079 : * Read the information about the start stream message from output stream.
1080 : */
1081 : TransactionId
1082 1678 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
1083 : {
1084 : TransactionId xid;
1085 :
1086 : Assert(first_segment);
1087 :
1088 1678 : xid = pq_getmsgint(in, 4);
1089 1678 : *first_segment = (pq_getmsgbyte(in) == 1);
1090 :
1091 1678 : return xid;
1092 : }
1093 :
1094 : /*
1095 : * Write the stop stream message to the output stream.
1096 : */
1097 : void
1098 1276 : logicalrep_write_stream_stop(StringInfo out)
1099 : {
1100 1276 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
1101 1276 : }
1102 :
1103 : /*
1104 : * Write STREAM COMMIT to the output stream.
1105 : */
1106 : void
1107 92 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
1108 : XLogRecPtr commit_lsn)
1109 : {
1110 92 : uint8 flags = 0;
1111 :
1112 92 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
1113 :
1114 : Assert(TransactionIdIsValid(txn->xid));
1115 :
1116 : /* transaction ID */
1117 92 : pq_sendint32(out, txn->xid);
1118 :
1119 : /* send the flags field (unused for now) */
1120 92 : pq_sendbyte(out, flags);
1121 :
1122 : /* send fields */
1123 92 : pq_sendint64(out, commit_lsn);
1124 92 : pq_sendint64(out, txn->end_lsn);
1125 92 : pq_sendint64(out, txn->commit_time);
1126 92 : }
1127 :
1128 : /*
1129 : * Read STREAM COMMIT from the output stream.
1130 : */
1131 : TransactionId
1132 122 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
1133 : {
1134 : TransactionId xid;
1135 : uint8 flags;
1136 :
1137 122 : xid = pq_getmsgint(in, 4);
1138 :
1139 : /* read flags (unused for now) */
1140 122 : flags = pq_getmsgbyte(in);
1141 :
1142 122 : if (flags != 0)
1143 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
1144 :
1145 : /* read fields */
1146 122 : commit_data->commit_lsn = pq_getmsgint64(in);
1147 122 : commit_data->end_lsn = pq_getmsgint64(in);
1148 122 : commit_data->committime = pq_getmsgint64(in);
1149 :
1150 122 : return xid;
1151 : }
1152 :
1153 : /*
1154 : * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1155 : * same for the top-level transaction abort.
1156 : *
1157 : * If write_abort_info is true, send the abort_lsn and abort_time fields,
1158 : * otherwise don't.
1159 : */
1160 : void
1161 52 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
1162 : TransactionId subxid, XLogRecPtr abort_lsn,
1163 : TimestampTz abort_time, bool write_abort_info)
1164 : {
1165 52 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
1166 :
1167 : Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
1168 :
1169 : /* transaction ID */
1170 52 : pq_sendint32(out, xid);
1171 52 : pq_sendint32(out, subxid);
1172 :
1173 52 : if (write_abort_info)
1174 : {
1175 24 : pq_sendint64(out, abort_lsn);
1176 24 : pq_sendint64(out, abort_time);
1177 : }
1178 52 : }
1179 :
1180 : /*
1181 : * Read STREAM ABORT from the output stream.
1182 : *
1183 : * If read_abort_info is true, read the abort_lsn and abort_time fields,
1184 : * otherwise don't.
1185 : */
1186 : void
1187 76 : logicalrep_read_stream_abort(StringInfo in,
1188 : LogicalRepStreamAbortData *abort_data,
1189 : bool read_abort_info)
1190 : {
1191 : Assert(abort_data);
1192 :
1193 76 : abort_data->xid = pq_getmsgint(in, 4);
1194 76 : abort_data->subxid = pq_getmsgint(in, 4);
1195 :
1196 76 : if (read_abort_info)
1197 : {
1198 48 : abort_data->abort_lsn = pq_getmsgint64(in);
1199 48 : abort_data->abort_time = pq_getmsgint64(in);
1200 : }
1201 : else
1202 : {
1203 28 : abort_data->abort_lsn = InvalidXLogRecPtr;
1204 28 : abort_data->abort_time = 0;
1205 : }
1206 76 : }
1207 :
1208 : /*
1209 : * Get string representing LogicalRepMsgType.
1210 : */
1211 : const char *
1212 2026 : logicalrep_message_type(LogicalRepMsgType action)
1213 : {
1214 : static char err_unknown[20];
1215 :
1216 2026 : switch (action)
1217 : {
1218 2 : case LOGICAL_REP_MSG_BEGIN:
1219 2 : return "BEGIN";
1220 12 : case LOGICAL_REP_MSG_COMMIT:
1221 12 : return "COMMIT";
1222 0 : case LOGICAL_REP_MSG_ORIGIN:
1223 0 : return "ORIGIN";
1224 1350 : case LOGICAL_REP_MSG_INSERT:
1225 1350 : return "INSERT";
1226 44 : case LOGICAL_REP_MSG_UPDATE:
1227 44 : return "UPDATE";
1228 28 : case LOGICAL_REP_MSG_DELETE:
1229 28 : return "DELETE";
1230 0 : case LOGICAL_REP_MSG_TRUNCATE:
1231 0 : return "TRUNCATE";
1232 4 : case LOGICAL_REP_MSG_RELATION:
1233 4 : return "RELATION";
1234 0 : case LOGICAL_REP_MSG_TYPE:
1235 0 : return "TYPE";
1236 0 : case LOGICAL_REP_MSG_MESSAGE:
1237 0 : return "MESSAGE";
1238 2 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
1239 2 : return "BEGIN PREPARE";
1240 4 : case LOGICAL_REP_MSG_PREPARE:
1241 4 : return "PREPARE";
1242 0 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
1243 0 : return "COMMIT PREPARED";
1244 0 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
1245 0 : return "ROLLBACK PREPARED";
1246 26 : case LOGICAL_REP_MSG_STREAM_START:
1247 26 : return "STREAM START";
1248 462 : case LOGICAL_REP_MSG_STREAM_STOP:
1249 462 : return "STREAM STOP";
1250 50 : case LOGICAL_REP_MSG_STREAM_COMMIT:
1251 50 : return "STREAM COMMIT";
1252 38 : case LOGICAL_REP_MSG_STREAM_ABORT:
1253 38 : return "STREAM ABORT";
1254 4 : case LOGICAL_REP_MSG_STREAM_PREPARE:
1255 4 : return "STREAM PREPARE";
1256 : }
1257 :
1258 : /*
1259 : * This message provides context in the error raised when applying a
1260 : * logical message. So we can't throw an error here. Return an unknown
1261 : * indicator value so that the original error is still reported.
1262 : */
1263 0 : snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1264 :
1265 0 : return err_unknown;
1266 : }
1267 :
1268 : /*
1269 : * Check if the column 'att' of a table should be published.
1270 : *
1271 : * 'columns' represents the publication column list (if any) for that table.
1272 : *
1273 : * 'include_gencols_type' value indicates whether generated columns should be
1274 : * published when there is no column list. Typically, this will have the same
1275 : * value as the 'publish_generated_columns' publication parameter.
1276 : *
1277 : * Note that generated columns can be published only when present in a
1278 : * publication column list, or when include_gencols_type is
1279 : * PUBLISH_GENCOLS_STORED.
1280 : */
1281 : bool
1282 1469842 : logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
1283 : PublishGencolsType include_gencols_type)
1284 : {
1285 1469842 : if (att->attisdropped)
1286 102 : return false;
1287 :
1288 : /* If a column list is provided, publish only the cols in that list. */
1289 1469740 : if (columns)
1290 1898 : return bms_is_member(att->attnum, columns);
1291 :
1292 : /* All non-generated columns are always published. */
1293 1467842 : if (!att->attgenerated)
1294 1467728 : return true;
1295 :
1296 : /*
1297 : * Stored generated columns are only published when the user sets
1298 : * publish_generated_columns as stored.
1299 : */
1300 114 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1301 66 : return include_gencols_type == PUBLISH_GENCOLS_STORED;
1302 :
1303 48 : return false;
1304 : }
|