Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * proto.c
4 : * logical replication protocol functions
5 : *
6 : * Copyright (c) 2015-2024, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/proto.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/sysattr.h"
16 : #include "catalog/pg_namespace.h"
17 : #include "catalog/pg_type.h"
18 : #include "libpq/pqformat.h"
19 : #include "replication/logicalproto.h"
20 : #include "utils/lsyscache.h"
21 : #include "utils/syscache.h"
22 :
23 : /*
24 : * Protocol message flags.
25 : */
26 : #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 :
28 : #define MESSAGE_TRANSACTIONAL (1<<0)
29 : #define TRUNCATE_CASCADE (1<<0)
30 : #define TRUNCATE_RESTART_SEQS (1<<1)
31 :
32 : static void logicalrep_write_attrs(StringInfo out, Relation rel,
33 : Bitmapset *columns, bool include_gencols);
34 : static void logicalrep_write_tuple(StringInfo out, Relation rel,
35 : TupleTableSlot *slot,
36 : bool binary, Bitmapset *columns,
37 : bool include_gencols);
38 : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
39 : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
40 :
41 : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
42 : static const char *logicalrep_read_namespace(StringInfo in);
43 :
44 : /*
45 : * Write BEGIN to the output stream.
46 : */
47 : void
48 770 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
49 : {
50 770 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
51 :
52 : /* fixed fields */
53 770 : pq_sendint64(out, txn->final_lsn);
54 770 : pq_sendint64(out, txn->xact_time.commit_time);
55 770 : pq_sendint32(out, txn->xid);
56 770 : }
57 :
58 : /*
59 : * Read transaction BEGIN from the stream.
60 : */
61 : void
62 838 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
63 : {
64 : /* read fields */
65 838 : begin_data->final_lsn = pq_getmsgint64(in);
66 838 : if (begin_data->final_lsn == InvalidXLogRecPtr)
67 0 : elog(ERROR, "final_lsn not set in begin message");
68 838 : begin_data->committime = pq_getmsgint64(in);
69 838 : begin_data->xid = pq_getmsgint(in, 4);
70 838 : }
71 :
72 :
73 : /*
74 : * Write COMMIT to the output stream.
75 : */
76 : void
77 768 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
78 : XLogRecPtr commit_lsn)
79 : {
80 768 : uint8 flags = 0;
81 :
82 768 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
83 :
84 : /* send the flags field (unused for now) */
85 768 : pq_sendbyte(out, flags);
86 :
87 : /* send fields */
88 768 : pq_sendint64(out, commit_lsn);
89 768 : pq_sendint64(out, txn->end_lsn);
90 768 : pq_sendint64(out, txn->xact_time.commit_time);
91 768 : }
92 :
93 : /*
94 : * Read transaction COMMIT from the stream.
95 : */
96 : void
97 790 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
98 : {
99 : /* read flags (unused for now) */
100 790 : uint8 flags = pq_getmsgbyte(in);
101 :
102 790 : if (flags != 0)
103 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
104 :
105 : /* read fields */
106 790 : commit_data->commit_lsn = pq_getmsgint64(in);
107 790 : commit_data->end_lsn = pq_getmsgint64(in);
108 790 : commit_data->committime = pq_getmsgint64(in);
109 790 : }
110 :
111 : /*
112 : * Write BEGIN PREPARE to the output stream.
113 : */
114 : void
115 40 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
116 : {
117 40 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
118 :
119 : /* fixed fields */
120 40 : pq_sendint64(out, txn->final_lsn);
121 40 : pq_sendint64(out, txn->end_lsn);
122 40 : pq_sendint64(out, txn->xact_time.prepare_time);
123 40 : pq_sendint32(out, txn->xid);
124 :
125 : /* send gid */
126 40 : pq_sendstring(out, txn->gid);
127 40 : }
128 :
129 : /*
130 : * Read transaction BEGIN PREPARE from the stream.
131 : */
132 : void
133 32 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
134 : {
135 : /* read fields */
136 32 : begin_data->prepare_lsn = pq_getmsgint64(in);
137 32 : if (begin_data->prepare_lsn == InvalidXLogRecPtr)
138 0 : elog(ERROR, "prepare_lsn not set in begin prepare message");
139 32 : begin_data->end_lsn = pq_getmsgint64(in);
140 32 : if (begin_data->end_lsn == InvalidXLogRecPtr)
141 0 : elog(ERROR, "end_lsn not set in begin prepare message");
142 32 : begin_data->prepare_time = pq_getmsgint64(in);
143 32 : begin_data->xid = pq_getmsgint(in, 4);
144 :
145 : /* read gid (copy it into a pre-allocated buffer) */
146 32 : strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
147 32 : }
148 :
149 : /*
150 : * The core functionality for logicalrep_write_prepare and
151 : * logicalrep_write_stream_prepare.
152 : */
153 : static void
154 68 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
155 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
156 : {
157 68 : uint8 flags = 0;
158 :
159 68 : pq_sendbyte(out, type);
160 :
161 : /*
162 : * This should only ever happen for two-phase commit transactions, in
163 : * which case we expect to have a valid GID.
164 : */
165 : Assert(txn->gid != NULL);
166 : Assert(rbtxn_prepared(txn));
167 : Assert(TransactionIdIsValid(txn->xid));
168 :
169 : /* send the flags field */
170 68 : pq_sendbyte(out, flags);
171 :
172 : /* send fields */
173 68 : pq_sendint64(out, prepare_lsn);
174 68 : pq_sendint64(out, txn->end_lsn);
175 68 : pq_sendint64(out, txn->xact_time.prepare_time);
176 68 : pq_sendint32(out, txn->xid);
177 :
178 : /* send gid */
179 68 : pq_sendstring(out, txn->gid);
180 68 : }
181 :
182 : /*
183 : * Write PREPARE to the output stream.
184 : */
185 : void
186 40 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
187 : XLogRecPtr prepare_lsn)
188 : {
189 40 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
190 : txn, prepare_lsn);
191 40 : }
192 :
193 : /*
194 : * The core functionality for logicalrep_read_prepare and
195 : * logicalrep_read_stream_prepare.
196 : */
197 : static void
198 52 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
199 : LogicalRepPreparedTxnData *prepare_data)
200 : {
201 : /* read flags */
202 52 : uint8 flags = pq_getmsgbyte(in);
203 :
204 52 : if (flags != 0)
205 0 : elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
206 :
207 : /* read fields */
208 52 : prepare_data->prepare_lsn = pq_getmsgint64(in);
209 52 : if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
210 0 : elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
211 52 : prepare_data->end_lsn = pq_getmsgint64(in);
212 52 : if (prepare_data->end_lsn == InvalidXLogRecPtr)
213 0 : elog(ERROR, "end_lsn is not set in %s message", msgtype);
214 52 : prepare_data->prepare_time = pq_getmsgint64(in);
215 52 : prepare_data->xid = pq_getmsgint(in, 4);
216 52 : if (prepare_data->xid == InvalidTransactionId)
217 0 : elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
218 :
219 : /* read gid (copy it into a pre-allocated buffer) */
220 52 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
221 52 : }
222 :
223 : /*
224 : * Read transaction PREPARE from the stream.
225 : */
226 : void
227 30 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
228 : {
229 30 : logicalrep_read_prepare_common(in, "prepare", prepare_data);
230 30 : }
231 :
232 : /*
233 : * Write COMMIT PREPARED to the output stream.
234 : */
235 : void
236 48 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
237 : XLogRecPtr commit_lsn)
238 : {
239 48 : uint8 flags = 0;
240 :
241 48 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
242 :
243 : /*
244 : * This should only ever happen for two-phase commit transactions, in
245 : * which case we expect to have a valid GID.
246 : */
247 : Assert(txn->gid != NULL);
248 :
249 : /* send the flags field */
250 48 : pq_sendbyte(out, flags);
251 :
252 : /* send fields */
253 48 : pq_sendint64(out, commit_lsn);
254 48 : pq_sendint64(out, txn->end_lsn);
255 48 : pq_sendint64(out, txn->xact_time.commit_time);
256 48 : pq_sendint32(out, txn->xid);
257 :
258 : /* send gid */
259 48 : pq_sendstring(out, txn->gid);
260 48 : }
261 :
262 : /*
263 : * Read transaction COMMIT PREPARED from the stream.
264 : */
265 : void
266 40 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
267 : {
268 : /* read flags */
269 40 : uint8 flags = pq_getmsgbyte(in);
270 :
271 40 : if (flags != 0)
272 0 : elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
273 :
274 : /* read fields */
275 40 : prepare_data->commit_lsn = pq_getmsgint64(in);
276 40 : if (prepare_data->commit_lsn == InvalidXLogRecPtr)
277 0 : elog(ERROR, "commit_lsn is not set in commit prepared message");
278 40 : prepare_data->end_lsn = pq_getmsgint64(in);
279 40 : if (prepare_data->end_lsn == InvalidXLogRecPtr)
280 0 : elog(ERROR, "end_lsn is not set in commit prepared message");
281 40 : prepare_data->commit_time = pq_getmsgint64(in);
282 40 : prepare_data->xid = pq_getmsgint(in, 4);
283 :
284 : /* read gid (copy it into a pre-allocated buffer) */
285 40 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
286 40 : }
287 :
288 : /*
289 : * Write ROLLBACK PREPARED to the output stream.
290 : */
291 : void
292 18 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
293 : XLogRecPtr prepare_end_lsn,
294 : TimestampTz prepare_time)
295 : {
296 18 : uint8 flags = 0;
297 :
298 18 : pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
299 :
300 : /*
301 : * This should only ever happen for two-phase commit transactions, in
302 : * which case we expect to have a valid GID.
303 : */
304 : Assert(txn->gid != NULL);
305 :
306 : /* send the flags field */
307 18 : pq_sendbyte(out, flags);
308 :
309 : /* send fields */
310 18 : pq_sendint64(out, prepare_end_lsn);
311 18 : pq_sendint64(out, txn->end_lsn);
312 18 : pq_sendint64(out, prepare_time);
313 18 : pq_sendint64(out, txn->xact_time.commit_time);
314 18 : pq_sendint32(out, txn->xid);
315 :
316 : /* send gid */
317 18 : pq_sendstring(out, txn->gid);
318 18 : }
319 :
320 : /*
321 : * Read transaction ROLLBACK PREPARED from the stream.
322 : */
323 : void
324 10 : logicalrep_read_rollback_prepared(StringInfo in,
325 : LogicalRepRollbackPreparedTxnData *rollback_data)
326 : {
327 : /* read flags */
328 10 : uint8 flags = pq_getmsgbyte(in);
329 :
330 10 : if (flags != 0)
331 0 : elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
332 :
333 : /* read fields */
334 10 : rollback_data->prepare_end_lsn = pq_getmsgint64(in);
335 10 : if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
336 0 : elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
337 10 : rollback_data->rollback_end_lsn = pq_getmsgint64(in);
338 10 : if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
339 0 : elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
340 10 : rollback_data->prepare_time = pq_getmsgint64(in);
341 10 : rollback_data->rollback_time = pq_getmsgint64(in);
342 10 : rollback_data->xid = pq_getmsgint(in, 4);
343 :
344 : /* read gid (copy it into a pre-allocated buffer) */
345 10 : strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
346 10 : }
347 :
348 : /*
349 : * Write STREAM PREPARE to the output stream.
350 : */
351 : void
352 28 : logicalrep_write_stream_prepare(StringInfo out,
353 : ReorderBufferTXN *txn,
354 : XLogRecPtr prepare_lsn)
355 : {
356 28 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
357 : txn, prepare_lsn);
358 28 : }
359 :
360 : /*
361 : * Read STREAM PREPARE from the stream.
362 : */
363 : void
364 22 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
365 : {
366 22 : logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
367 22 : }
368 :
369 : /*
370 : * Write ORIGIN to the output stream.
371 : */
372 : void
373 18 : logicalrep_write_origin(StringInfo out, const char *origin,
374 : XLogRecPtr origin_lsn)
375 : {
376 18 : pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
377 :
378 : /* fixed fields */
379 18 : pq_sendint64(out, origin_lsn);
380 :
381 : /* origin string */
382 18 : pq_sendstring(out, origin);
383 18 : }
384 :
385 : /*
386 : * Read ORIGIN from the output stream.
387 : */
388 : char *
389 0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
390 : {
391 : /* fixed fields */
392 0 : *origin_lsn = pq_getmsgint64(in);
393 :
394 : /* return origin */
395 0 : return pstrdup(pq_getmsgstring(in));
396 : }
397 :
398 : /*
399 : * Write INSERT to the output stream.
400 : */
401 : void
402 211700 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
403 : TupleTableSlot *newslot, bool binary,
404 : Bitmapset *columns, bool include_gencols)
405 : {
406 211700 : pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
407 :
408 : /* transaction ID (if not valid, we're not streaming) */
409 211700 : if (TransactionIdIsValid(xid))
410 200168 : pq_sendint32(out, xid);
411 :
412 : /* use Oid as relation identifier */
413 211700 : pq_sendint32(out, RelationGetRelid(rel));
414 :
415 211700 : pq_sendbyte(out, 'N'); /* new tuple follows */
416 211700 : logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
417 211700 : }
418 :
419 : /*
420 : * Read INSERT from stream.
421 : *
422 : * Fills the new tuple.
423 : */
424 : LogicalRepRelId
425 152622 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
426 : {
427 : char action;
428 : LogicalRepRelId relid;
429 :
430 : /* read the relation id */
431 152622 : relid = pq_getmsgint(in, 4);
432 :
433 152622 : action = pq_getmsgbyte(in);
434 152622 : if (action != 'N')
435 0 : elog(ERROR, "expected new tuple but got %d",
436 : action);
437 :
438 152622 : logicalrep_read_tuple(in, newtup);
439 :
440 152622 : return relid;
441 : }
442 :
443 : /*
444 : * Write UPDATE to the output stream.
445 : */
446 : void
447 68868 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
448 : TupleTableSlot *oldslot, TupleTableSlot *newslot,
449 : bool binary, Bitmapset *columns, bool include_gencols)
450 : {
451 68868 : pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
452 :
453 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
454 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
455 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
456 :
457 : /* transaction ID (if not valid, we're not streaming) */
458 68868 : if (TransactionIdIsValid(xid))
459 68464 : pq_sendint32(out, xid);
460 :
461 : /* use Oid as relation identifier */
462 68868 : pq_sendint32(out, RelationGetRelid(rel));
463 :
464 68868 : if (oldslot != NULL)
465 : {
466 250 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
467 118 : pq_sendbyte(out, 'O'); /* old tuple follows */
468 : else
469 132 : pq_sendbyte(out, 'K'); /* old key follows */
470 250 : logicalrep_write_tuple(out, rel, oldslot, binary, columns,
471 : include_gencols);
472 : }
473 :
474 68868 : pq_sendbyte(out, 'N'); /* new tuple follows */
475 68868 : logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
476 68868 : }
477 :
478 : /*
479 : * Read UPDATE from stream.
480 : */
481 : LogicalRepRelId
482 63856 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
483 : LogicalRepTupleData *oldtup,
484 : LogicalRepTupleData *newtup)
485 : {
486 : char action;
487 : LogicalRepRelId relid;
488 :
489 : /* read the relation id */
490 63856 : relid = pq_getmsgint(in, 4);
491 :
492 : /* read and verify action */
493 63856 : action = pq_getmsgbyte(in);
494 63856 : if (action != 'K' && action != 'O' && action != 'N')
495 0 : elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
496 : action);
497 :
498 : /* check for old tuple */
499 63856 : if (action == 'K' || action == 'O')
500 : {
501 254 : logicalrep_read_tuple(in, oldtup);
502 254 : *has_oldtuple = true;
503 :
504 254 : action = pq_getmsgbyte(in);
505 : }
506 : else
507 63602 : *has_oldtuple = false;
508 :
509 : /* check for new tuple */
510 63856 : if (action != 'N')
511 0 : elog(ERROR, "expected action 'N', got %c",
512 : action);
513 :
514 63856 : logicalrep_read_tuple(in, newtup);
515 :
516 63856 : return relid;
517 : }
518 :
519 : /*
520 : * Write DELETE to the output stream.
521 : */
522 : void
523 83754 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
524 : TupleTableSlot *oldslot, bool binary,
525 : Bitmapset *columns, bool include_gencols)
526 : {
527 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
528 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
529 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
530 :
531 83754 : pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
532 :
533 : /* transaction ID (if not valid, we're not streaming) */
534 83754 : if (TransactionIdIsValid(xid))
535 83250 : pq_sendint32(out, xid);
536 :
537 : /* use Oid as relation identifier */
538 83754 : pq_sendint32(out, RelationGetRelid(rel));
539 :
540 83754 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
541 248 : pq_sendbyte(out, 'O'); /* old tuple follows */
542 : else
543 83506 : pq_sendbyte(out, 'K'); /* old key follows */
544 :
545 83754 : logicalrep_write_tuple(out, rel, oldslot, binary, columns, include_gencols);
546 83754 : }
547 :
548 : /*
549 : * Read DELETE from stream.
550 : *
551 : * Fills the old tuple.
552 : */
553 : LogicalRepRelId
554 80624 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
555 : {
556 : char action;
557 : LogicalRepRelId relid;
558 :
559 : /* read the relation id */
560 80624 : relid = pq_getmsgint(in, 4);
561 :
562 : /* read and verify action */
563 80624 : action = pq_getmsgbyte(in);
564 80624 : if (action != 'K' && action != 'O')
565 0 : elog(ERROR, "expected action 'O' or 'K', got %c", action);
566 :
567 80624 : logicalrep_read_tuple(in, oldtup);
568 :
569 80624 : return relid;
570 : }
571 :
572 : /*
573 : * Write TRUNCATE to the output stream.
574 : */
575 : void
576 20 : logicalrep_write_truncate(StringInfo out,
577 : TransactionId xid,
578 : int nrelids,
579 : Oid relids[],
580 : bool cascade, bool restart_seqs)
581 : {
582 : int i;
583 20 : uint8 flags = 0;
584 :
585 20 : pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
586 :
587 : /* transaction ID (if not valid, we're not streaming) */
588 20 : if (TransactionIdIsValid(xid))
589 0 : pq_sendint32(out, xid);
590 :
591 20 : pq_sendint32(out, nrelids);
592 :
593 : /* encode and send truncate flags */
594 20 : if (cascade)
595 0 : flags |= TRUNCATE_CASCADE;
596 20 : if (restart_seqs)
597 0 : flags |= TRUNCATE_RESTART_SEQS;
598 20 : pq_sendint8(out, flags);
599 :
600 52 : for (i = 0; i < nrelids; i++)
601 32 : pq_sendint32(out, relids[i]);
602 20 : }
603 :
604 : /*
605 : * Read TRUNCATE from stream.
606 : */
607 : List *
608 38 : logicalrep_read_truncate(StringInfo in,
609 : bool *cascade, bool *restart_seqs)
610 : {
611 : int i;
612 : int nrelids;
613 38 : List *relids = NIL;
614 : uint8 flags;
615 :
616 38 : nrelids = pq_getmsgint(in, 4);
617 :
618 : /* read and decode truncate flags */
619 38 : flags = pq_getmsgint(in, 1);
620 38 : *cascade = (flags & TRUNCATE_CASCADE) > 0;
621 38 : *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
622 :
623 94 : for (i = 0; i < nrelids; i++)
624 56 : relids = lappend_oid(relids, pq_getmsgint(in, 4));
625 :
626 38 : return relids;
627 : }
628 :
629 : /*
630 : * Write MESSAGE to stream
631 : */
632 : void
633 10 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
634 : bool transactional, const char *prefix, Size sz,
635 : const char *message)
636 : {
637 10 : uint8 flags = 0;
638 :
639 10 : pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
640 :
641 : /* encode and send message flags */
642 10 : if (transactional)
643 4 : flags |= MESSAGE_TRANSACTIONAL;
644 :
645 : /* transaction ID (if not valid, we're not streaming) */
646 10 : if (TransactionIdIsValid(xid))
647 0 : pq_sendint32(out, xid);
648 :
649 10 : pq_sendint8(out, flags);
650 10 : pq_sendint64(out, lsn);
651 10 : pq_sendstring(out, prefix);
652 10 : pq_sendint32(out, sz);
653 10 : pq_sendbytes(out, message, sz);
654 10 : }
655 :
656 : /*
657 : * Write relation description to the output stream.
658 : */
659 : void
660 656 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
661 : Bitmapset *columns, bool include_gencols)
662 : {
663 : char *relname;
664 :
665 656 : pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
666 :
667 : /* transaction ID (if not valid, we're not streaming) */
668 656 : if (TransactionIdIsValid(xid))
669 142 : pq_sendint32(out, xid);
670 :
671 : /* use Oid as relation identifier */
672 656 : pq_sendint32(out, RelationGetRelid(rel));
673 :
674 : /* send qualified relation name */
675 656 : logicalrep_write_namespace(out, RelationGetNamespace(rel));
676 656 : relname = RelationGetRelationName(rel);
677 656 : pq_sendstring(out, relname);
678 :
679 : /* send replica identity */
680 656 : pq_sendbyte(out, rel->rd_rel->relreplident);
681 :
682 : /* send the attribute info */
683 656 : logicalrep_write_attrs(out, rel, columns, include_gencols);
684 656 : }
685 :
686 : /*
687 : * Read the relation info from stream and return as LogicalRepRelation.
688 : */
689 : LogicalRepRelation *
690 766 : logicalrep_read_rel(StringInfo in)
691 : {
692 766 : LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
693 :
694 766 : rel->remoteid = pq_getmsgint(in, 4);
695 :
696 : /* Read relation name from stream */
697 766 : rel->nspname = pstrdup(logicalrep_read_namespace(in));
698 766 : rel->relname = pstrdup(pq_getmsgstring(in));
699 :
700 : /* Read the replica identity. */
701 766 : rel->replident = pq_getmsgbyte(in);
702 :
703 : /* Get attribute description */
704 766 : logicalrep_read_attrs(in, rel);
705 :
706 766 : return rel;
707 : }
708 :
709 : /*
710 : * Write type info to the output stream.
711 : *
712 : * This function will always write base type info.
713 : */
714 : void
715 36 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
716 : {
717 36 : Oid basetypoid = getBaseType(typoid);
718 : HeapTuple tup;
719 : Form_pg_type typtup;
720 :
721 36 : pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
722 :
723 : /* transaction ID (if not valid, we're not streaming) */
724 36 : if (TransactionIdIsValid(xid))
725 0 : pq_sendint32(out, xid);
726 :
727 36 : tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
728 36 : if (!HeapTupleIsValid(tup))
729 0 : elog(ERROR, "cache lookup failed for type %u", basetypoid);
730 36 : typtup = (Form_pg_type) GETSTRUCT(tup);
731 :
732 : /* use Oid as type identifier */
733 36 : pq_sendint32(out, typoid);
734 :
735 : /* send qualified type name */
736 36 : logicalrep_write_namespace(out, typtup->typnamespace);
737 36 : pq_sendstring(out, NameStr(typtup->typname));
738 :
739 36 : ReleaseSysCache(tup);
740 36 : }
741 :
742 : /*
743 : * Read type info from the output stream.
744 : */
745 : void
746 36 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
747 : {
748 36 : ltyp->remoteid = pq_getmsgint(in, 4);
749 :
750 : /* Read type name from stream */
751 36 : ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
752 36 : ltyp->typname = pstrdup(pq_getmsgstring(in));
753 36 : }
754 :
755 : /*
756 : * Write a tuple to the outputstream, in the most efficient format possible.
757 : */
758 : static void
759 364572 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
760 : bool binary, Bitmapset *columns, bool include_gencols)
761 : {
762 : TupleDesc desc;
763 : Datum *values;
764 : bool *isnull;
765 : int i;
766 364572 : uint16 nliveatts = 0;
767 :
768 364572 : desc = RelationGetDescr(rel);
769 :
770 1096600 : for (i = 0; i < desc->natts; i++)
771 : {
772 732028 : Form_pg_attribute att = TupleDescAttr(desc, i);
773 :
774 732028 : if (!logicalrep_should_publish_column(att, columns, include_gencols))
775 266 : continue;
776 :
777 731762 : nliveatts++;
778 : }
779 364572 : pq_sendint16(out, nliveatts);
780 :
781 364572 : slot_getallattrs(slot);
782 364572 : values = slot->tts_values;
783 364572 : isnull = slot->tts_isnull;
784 :
785 : /* Write the values */
786 1096600 : for (i = 0; i < desc->natts; i++)
787 : {
788 : HeapTuple typtup;
789 : Form_pg_type typclass;
790 732028 : Form_pg_attribute att = TupleDescAttr(desc, i);
791 :
792 732028 : if (!logicalrep_should_publish_column(att, columns, include_gencols))
793 266 : continue;
794 :
795 731762 : if (isnull[i])
796 : {
797 103820 : pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
798 103820 : continue;
799 : }
800 :
801 627942 : if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
802 : {
803 : /*
804 : * Unchanged toasted datum. (Note that we don't promise to detect
805 : * unchanged data in general; this is just a cheap check to avoid
806 : * sending large values unnecessarily.)
807 : */
808 6 : pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
809 6 : continue;
810 : }
811 :
812 627936 : typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
813 627936 : if (!HeapTupleIsValid(typtup))
814 0 : elog(ERROR, "cache lookup failed for type %u", att->atttypid);
815 627936 : typclass = (Form_pg_type) GETSTRUCT(typtup);
816 :
817 : /*
818 : * Send in binary if requested and type has suitable send function.
819 : */
820 627936 : if (binary && OidIsValid(typclass->typsend))
821 230086 : {
822 : bytea *outputbytes;
823 : int len;
824 :
825 230086 : pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
826 230086 : outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
827 230086 : len = VARSIZE(outputbytes) - VARHDRSZ;
828 230086 : pq_sendint(out, len, 4); /* length */
829 230086 : pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
830 230086 : pfree(outputbytes);
831 : }
832 : else
833 : {
834 : char *outputstr;
835 :
836 397850 : pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
837 397850 : outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
838 397850 : pq_sendcountedtext(out, outputstr, strlen(outputstr));
839 397850 : pfree(outputstr);
840 : }
841 :
842 627936 : ReleaseSysCache(typtup);
843 : }
844 364572 : }
845 :
846 : /*
847 : * Read tuple in logical replication format from stream.
848 : */
849 : static void
850 297356 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
851 : {
852 : int i;
853 : int natts;
854 :
855 : /* Get number of attributes */
856 297356 : natts = pq_getmsgint(in, 2);
857 :
858 : /* Allocate space for per-column values; zero out unused StringInfoDatas */
859 297356 : tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
860 297356 : tuple->colstatus = (char *) palloc(natts * sizeof(char));
861 297356 : tuple->ncols = natts;
862 :
863 : /* Read the data */
864 903752 : for (i = 0; i < natts; i++)
865 : {
866 : char *buff;
867 : char kind;
868 : int len;
869 606396 : StringInfo value = &tuple->colvalues[i];
870 :
871 606396 : kind = pq_getmsgbyte(in);
872 606396 : tuple->colstatus[i] = kind;
873 :
874 606396 : switch (kind)
875 : {
876 100712 : case LOGICALREP_COLUMN_NULL:
877 : /* nothing more to do */
878 100712 : break;
879 6 : case LOGICALREP_COLUMN_UNCHANGED:
880 : /* we don't receive the value of an unchanged column */
881 6 : break;
882 505678 : case LOGICALREP_COLUMN_TEXT:
883 : case LOGICALREP_COLUMN_BINARY:
884 505678 : len = pq_getmsgint(in, 4); /* read length */
885 :
886 : /* and data */
887 505678 : buff = palloc(len + 1);
888 505678 : pq_copymsgbytes(in, buff, len);
889 :
890 : /*
891 : * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
892 : * as input functions require that. For
893 : * LOGICALREP_COLUMN_BINARY it's not technically required, but
894 : * it's harmless.
895 : */
896 505678 : buff[len] = '\0';
897 :
898 505678 : initStringInfoFromString(value, buff, len);
899 505678 : break;
900 0 : default:
901 0 : elog(ERROR, "unrecognized data representation type '%c'", kind);
902 : }
903 : }
904 297356 : }
905 :
906 : /*
907 : * Write relation attribute metadata to the stream.
908 : */
909 : static void
910 656 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
911 : bool include_gencols)
912 : {
913 : TupleDesc desc;
914 : int i;
915 656 : uint16 nliveatts = 0;
916 656 : Bitmapset *idattrs = NULL;
917 : bool replidentfull;
918 :
919 656 : desc = RelationGetDescr(rel);
920 :
921 : /* send number of live attributes */
922 1980 : for (i = 0; i < desc->natts; i++)
923 : {
924 1324 : Form_pg_attribute att = TupleDescAttr(desc, i);
925 :
926 1324 : if (!logicalrep_should_publish_column(att, columns, include_gencols))
927 138 : continue;
928 :
929 1186 : nliveatts++;
930 : }
931 656 : pq_sendint16(out, nliveatts);
932 :
933 : /* fetch bitmap of REPLICATION IDENTITY attributes */
934 656 : replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
935 656 : if (!replidentfull)
936 558 : idattrs = RelationGetIdentityKeyBitmap(rel);
937 :
938 : /* send the attributes */
939 1980 : for (i = 0; i < desc->natts; i++)
940 : {
941 1324 : Form_pg_attribute att = TupleDescAttr(desc, i);
942 1324 : uint8 flags = 0;
943 :
944 1324 : if (!logicalrep_should_publish_column(att, columns, include_gencols))
945 138 : continue;
946 :
947 : /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
948 2234 : if (replidentfull ||
949 1048 : bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
950 : idattrs))
951 588 : flags |= LOGICALREP_IS_REPLICA_IDENTITY;
952 :
953 1186 : pq_sendbyte(out, flags);
954 :
955 : /* attribute name */
956 1186 : pq_sendstring(out, NameStr(att->attname));
957 :
958 : /* attribute type id */
959 1186 : pq_sendint32(out, (int) att->atttypid);
960 :
961 : /* attribute mode */
962 1186 : pq_sendint32(out, att->atttypmod);
963 : }
964 :
965 656 : bms_free(idattrs);
966 656 : }
967 :
968 : /*
969 : * Read relation attribute metadata from the stream.
970 : */
971 : static void
972 766 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
973 : {
974 : int i;
975 : int natts;
976 : char **attnames;
977 : Oid *atttyps;
978 766 : Bitmapset *attkeys = NULL;
979 :
980 766 : natts = pq_getmsgint(in, 2);
981 766 : attnames = palloc(natts * sizeof(char *));
982 766 : atttyps = palloc(natts * sizeof(Oid));
983 :
984 : /* read the attributes */
985 2136 : for (i = 0; i < natts; i++)
986 : {
987 : uint8 flags;
988 :
989 : /* Check for replica identity column */
990 1370 : flags = pq_getmsgbyte(in);
991 1370 : if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
992 668 : attkeys = bms_add_member(attkeys, i);
993 :
994 : /* attribute name */
995 1370 : attnames[i] = pstrdup(pq_getmsgstring(in));
996 :
997 : /* attribute type id */
998 1370 : atttyps[i] = (Oid) pq_getmsgint(in, 4);
999 :
1000 : /* we ignore attribute mode for now */
1001 1370 : (void) pq_getmsgint(in, 4);
1002 : }
1003 :
1004 766 : rel->attnames = attnames;
1005 766 : rel->atttyps = atttyps;
1006 766 : rel->attkeys = attkeys;
1007 766 : rel->natts = natts;
1008 766 : }
1009 :
1010 : /*
1011 : * Write the namespace name or empty string for pg_catalog (to save space).
1012 : */
1013 : static void
1014 692 : logicalrep_write_namespace(StringInfo out, Oid nspid)
1015 : {
1016 692 : if (nspid == PG_CATALOG_NAMESPACE)
1017 2 : pq_sendbyte(out, '\0');
1018 : else
1019 : {
1020 690 : char *nspname = get_namespace_name(nspid);
1021 :
1022 690 : if (nspname == NULL)
1023 0 : elog(ERROR, "cache lookup failed for namespace %u",
1024 : nspid);
1025 :
1026 690 : pq_sendstring(out, nspname);
1027 : }
1028 692 : }
1029 :
1030 : /*
1031 : * Read the namespace name while treating empty string as pg_catalog.
1032 : */
1033 : static const char *
1034 802 : logicalrep_read_namespace(StringInfo in)
1035 : {
1036 802 : const char *nspname = pq_getmsgstring(in);
1037 :
1038 802 : if (nspname[0] == '\0')
1039 2 : nspname = "pg_catalog";
1040 :
1041 802 : return nspname;
1042 : }
1043 :
1044 : /*
1045 : * Write the information for the start stream message to the output stream.
1046 : */
1047 : void
1048 1274 : logicalrep_write_stream_start(StringInfo out,
1049 : TransactionId xid, bool first_segment)
1050 : {
1051 1274 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
1052 :
1053 : Assert(TransactionIdIsValid(xid));
1054 :
1055 : /* transaction ID (we're starting to stream, so must be valid) */
1056 1274 : pq_sendint32(out, xid);
1057 :
1058 : /* 1 if this is the first streaming segment for this xid */
1059 1274 : pq_sendbyte(out, first_segment ? 1 : 0);
1060 1274 : }
1061 :
1062 : /*
1063 : * Read the information about the start stream message from output stream.
1064 : */
1065 : TransactionId
1066 1676 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
1067 : {
1068 : TransactionId xid;
1069 :
1070 : Assert(first_segment);
1071 :
1072 1676 : xid = pq_getmsgint(in, 4);
1073 1676 : *first_segment = (pq_getmsgbyte(in) == 1);
1074 :
1075 1676 : return xid;
1076 : }
1077 :
1078 : /*
1079 : * Write the stop stream message to the output stream.
1080 : */
1081 : void
1082 1274 : logicalrep_write_stream_stop(StringInfo out)
1083 : {
1084 1274 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
1085 1274 : }
1086 :
1087 : /*
1088 : * Write STREAM COMMIT to the output stream.
1089 : */
1090 : void
1091 92 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
1092 : XLogRecPtr commit_lsn)
1093 : {
1094 92 : uint8 flags = 0;
1095 :
1096 92 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
1097 :
1098 : Assert(TransactionIdIsValid(txn->xid));
1099 :
1100 : /* transaction ID */
1101 92 : pq_sendint32(out, txn->xid);
1102 :
1103 : /* send the flags field (unused for now) */
1104 92 : pq_sendbyte(out, flags);
1105 :
1106 : /* send fields */
1107 92 : pq_sendint64(out, commit_lsn);
1108 92 : pq_sendint64(out, txn->end_lsn);
1109 92 : pq_sendint64(out, txn->xact_time.commit_time);
1110 92 : }
1111 :
1112 : /*
1113 : * Read STREAM COMMIT from the output stream.
1114 : */
1115 : TransactionId
1116 122 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
1117 : {
1118 : TransactionId xid;
1119 : uint8 flags;
1120 :
1121 122 : xid = pq_getmsgint(in, 4);
1122 :
1123 : /* read flags (unused for now) */
1124 122 : flags = pq_getmsgbyte(in);
1125 :
1126 122 : if (flags != 0)
1127 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
1128 :
1129 : /* read fields */
1130 122 : commit_data->commit_lsn = pq_getmsgint64(in);
1131 122 : commit_data->end_lsn = pq_getmsgint64(in);
1132 122 : commit_data->committime = pq_getmsgint64(in);
1133 :
1134 122 : return xid;
1135 : }
1136 :
1137 : /*
1138 : * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1139 : * same for the top-level transaction abort.
1140 : *
1141 : * If write_abort_info is true, send the abort_lsn and abort_time fields,
1142 : * otherwise don't.
1143 : */
1144 : void
1145 52 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
1146 : TransactionId subxid, XLogRecPtr abort_lsn,
1147 : TimestampTz abort_time, bool write_abort_info)
1148 : {
1149 52 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
1150 :
1151 : Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
1152 :
1153 : /* transaction ID */
1154 52 : pq_sendint32(out, xid);
1155 52 : pq_sendint32(out, subxid);
1156 :
1157 52 : if (write_abort_info)
1158 : {
1159 24 : pq_sendint64(out, abort_lsn);
1160 24 : pq_sendint64(out, abort_time);
1161 : }
1162 52 : }
1163 :
1164 : /*
1165 : * Read STREAM ABORT from the output stream.
1166 : *
1167 : * If read_abort_info is true, read the abort_lsn and abort_time fields,
1168 : * otherwise don't.
1169 : */
1170 : void
1171 76 : logicalrep_read_stream_abort(StringInfo in,
1172 : LogicalRepStreamAbortData *abort_data,
1173 : bool read_abort_info)
1174 : {
1175 : Assert(abort_data);
1176 :
1177 76 : abort_data->xid = pq_getmsgint(in, 4);
1178 76 : abort_data->subxid = pq_getmsgint(in, 4);
1179 :
1180 76 : if (read_abort_info)
1181 : {
1182 48 : abort_data->abort_lsn = pq_getmsgint64(in);
1183 48 : abort_data->abort_time = pq_getmsgint64(in);
1184 : }
1185 : else
1186 : {
1187 28 : abort_data->abort_lsn = InvalidXLogRecPtr;
1188 28 : abort_data->abort_time = 0;
1189 : }
1190 76 : }
1191 :
1192 : /*
1193 : * Get string representing LogicalRepMsgType.
1194 : */
1195 : const char *
1196 698 : logicalrep_message_type(LogicalRepMsgType action)
1197 : {
1198 : static char err_unknown[20];
1199 :
1200 698 : switch (action)
1201 : {
1202 2 : case LOGICAL_REP_MSG_BEGIN:
1203 2 : return "BEGIN";
1204 2 : case LOGICAL_REP_MSG_COMMIT:
1205 2 : return "COMMIT";
1206 0 : case LOGICAL_REP_MSG_ORIGIN:
1207 0 : return "ORIGIN";
1208 66 : case LOGICAL_REP_MSG_INSERT:
1209 66 : return "INSERT";
1210 30 : case LOGICAL_REP_MSG_UPDATE:
1211 30 : return "UPDATE";
1212 22 : case LOGICAL_REP_MSG_DELETE:
1213 22 : return "DELETE";
1214 0 : case LOGICAL_REP_MSG_TRUNCATE:
1215 0 : return "TRUNCATE";
1216 4 : case LOGICAL_REP_MSG_RELATION:
1217 4 : return "RELATION";
1218 0 : case LOGICAL_REP_MSG_TYPE:
1219 0 : return "TYPE";
1220 0 : case LOGICAL_REP_MSG_MESSAGE:
1221 0 : return "MESSAGE";
1222 2 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
1223 2 : return "BEGIN PREPARE";
1224 4 : case LOGICAL_REP_MSG_PREPARE:
1225 4 : return "PREPARE";
1226 0 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
1227 0 : return "COMMIT PREPARED";
1228 0 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
1229 0 : return "ROLLBACK PREPARED";
1230 26 : case LOGICAL_REP_MSG_STREAM_START:
1231 26 : return "STREAM START";
1232 458 : case LOGICAL_REP_MSG_STREAM_STOP:
1233 458 : return "STREAM STOP";
1234 40 : case LOGICAL_REP_MSG_STREAM_COMMIT:
1235 40 : return "STREAM COMMIT";
1236 38 : case LOGICAL_REP_MSG_STREAM_ABORT:
1237 38 : return "STREAM ABORT";
1238 4 : case LOGICAL_REP_MSG_STREAM_PREPARE:
1239 4 : return "STREAM PREPARE";
1240 : }
1241 :
1242 : /*
1243 : * This message provides context in the error raised when applying a
1244 : * logical message. So we can't throw an error here. Return an unknown
1245 : * indicator value so that the original error is still reported.
1246 : */
1247 0 : snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1248 :
1249 0 : return err_unknown;
1250 : }
1251 :
1252 : /*
1253 : * Check if the column 'att' of a table should be published.
1254 : *
1255 : * 'columns' represents the publication column list (if any) for that table.
1256 : *
1257 : * 'include_gencols' flag indicates whether generated columns should be
1258 : * published when there is no column list. Typically, this will have the same
1259 : * value as the 'publish_generated_columns' publication parameter.
1260 : *
1261 : * Note that generated columns can be published only when present in a
1262 : * publication column list, or when include_gencols is true.
1263 : */
1264 : bool
1265 1468028 : logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
1266 : bool include_gencols)
1267 : {
1268 1468028 : if (att->attisdropped)
1269 102 : return false;
1270 :
1271 : /* If a column list is provided, publish only the cols in that list. */
1272 1467926 : if (columns)
1273 1888 : return bms_is_member(att->attnum, columns);
1274 :
1275 : /* All non-generated columns are always published. */
1276 1466038 : return att->attgenerated ? include_gencols : true;
1277 : }
|