Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * proto.c
4 : * logical replication protocol functions
5 : *
6 : * Copyright (c) 2015-2024, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/proto.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/sysattr.h"
16 : #include "catalog/pg_namespace.h"
17 : #include "catalog/pg_type.h"
18 : #include "libpq/pqformat.h"
19 : #include "replication/logicalproto.h"
20 : #include "utils/lsyscache.h"
21 : #include "utils/syscache.h"
22 :
23 : /*
24 : * Protocol message flags.
25 : */
26 : #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 :
28 : #define MESSAGE_TRANSACTIONAL (1<<0)
29 : #define TRUNCATE_CASCADE (1<<0)
30 : #define TRUNCATE_RESTART_SEQS (1<<1)
31 :
32 : static void logicalrep_write_attrs(StringInfo out, Relation rel,
33 : Bitmapset *columns);
34 : static void logicalrep_write_tuple(StringInfo out, Relation rel,
35 : TupleTableSlot *slot,
36 : bool binary, Bitmapset *columns);
37 : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
38 : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
39 :
40 : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
41 : static const char *logicalrep_read_namespace(StringInfo in);
42 :
43 : /*
44 : * Check if a column is covered by a column list.
45 : *
46 : * Need to be careful about NULL, which is treated as a column list covering
47 : * all columns.
48 : */
49 : static bool
50 1465868 : column_in_column_list(int attnum, Bitmapset *columns)
51 : {
52 1465868 : return (columns == NULL || bms_is_member(attnum, columns));
53 : }
54 :
55 :
56 : /*
57 : * Write BEGIN to the output stream.
58 : */
59 : void
60 708 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
61 : {
62 708 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
63 :
64 : /* fixed fields */
65 708 : pq_sendint64(out, txn->final_lsn);
66 708 : pq_sendint64(out, txn->xact_time.commit_time);
67 708 : pq_sendint32(out, txn->xid);
68 708 : }
69 :
70 : /*
71 : * Read transaction BEGIN from the stream.
72 : */
73 : void
74 814 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
75 : {
76 : /* read fields */
77 814 : begin_data->final_lsn = pq_getmsgint64(in);
78 814 : if (begin_data->final_lsn == InvalidXLogRecPtr)
79 0 : elog(ERROR, "final_lsn not set in begin message");
80 814 : begin_data->committime = pq_getmsgint64(in);
81 814 : begin_data->xid = pq_getmsgint(in, 4);
82 814 : }
83 :
84 :
85 : /*
86 : * Write COMMIT to the output stream.
87 : */
88 : void
89 708 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
90 : XLogRecPtr commit_lsn)
91 : {
92 708 : uint8 flags = 0;
93 :
94 708 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
95 :
96 : /* send the flags field (unused for now) */
97 708 : pq_sendbyte(out, flags);
98 :
99 : /* send fields */
100 708 : pq_sendint64(out, commit_lsn);
101 708 : pq_sendint64(out, txn->end_lsn);
102 708 : pq_sendint64(out, txn->xact_time.commit_time);
103 708 : }
104 :
105 : /*
106 : * Read transaction COMMIT from the stream.
107 : */
108 : void
109 760 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
110 : {
111 : /* read flags (unused for now) */
112 760 : uint8 flags = pq_getmsgbyte(in);
113 :
114 760 : if (flags != 0)
115 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
116 :
117 : /* read fields */
118 760 : commit_data->commit_lsn = pq_getmsgint64(in);
119 760 : commit_data->end_lsn = pq_getmsgint64(in);
120 760 : commit_data->committime = pq_getmsgint64(in);
121 760 : }
122 :
123 : /*
124 : * Write BEGIN PREPARE to the output stream.
125 : */
126 : void
127 36 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
128 : {
129 36 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
130 :
131 : /* fixed fields */
132 36 : pq_sendint64(out, txn->final_lsn);
133 36 : pq_sendint64(out, txn->end_lsn);
134 36 : pq_sendint64(out, txn->xact_time.prepare_time);
135 36 : pq_sendint32(out, txn->xid);
136 :
137 : /* send gid */
138 36 : pq_sendstring(out, txn->gid);
139 36 : }
140 :
141 : /*
142 : * Read transaction BEGIN PREPARE from the stream.
143 : */
144 : void
145 28 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
146 : {
147 : /* read fields */
148 28 : begin_data->prepare_lsn = pq_getmsgint64(in);
149 28 : if (begin_data->prepare_lsn == InvalidXLogRecPtr)
150 0 : elog(ERROR, "prepare_lsn not set in begin prepare message");
151 28 : begin_data->end_lsn = pq_getmsgint64(in);
152 28 : if (begin_data->end_lsn == InvalidXLogRecPtr)
153 0 : elog(ERROR, "end_lsn not set in begin prepare message");
154 28 : begin_data->prepare_time = pq_getmsgint64(in);
155 28 : begin_data->xid = pq_getmsgint(in, 4);
156 :
157 : /* read gid (copy it into a pre-allocated buffer) */
158 28 : strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
159 28 : }
160 :
161 : /*
162 : * The core functionality for logicalrep_write_prepare and
163 : * logicalrep_write_stream_prepare.
164 : */
165 : static void
166 64 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
167 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
168 : {
169 64 : uint8 flags = 0;
170 :
171 64 : pq_sendbyte(out, type);
172 :
173 : /*
174 : * This should only ever happen for two-phase commit transactions, in
175 : * which case we expect to have a valid GID.
176 : */
177 : Assert(txn->gid != NULL);
178 : Assert(rbtxn_prepared(txn));
179 : Assert(TransactionIdIsValid(txn->xid));
180 :
181 : /* send the flags field */
182 64 : pq_sendbyte(out, flags);
183 :
184 : /* send fields */
185 64 : pq_sendint64(out, prepare_lsn);
186 64 : pq_sendint64(out, txn->end_lsn);
187 64 : pq_sendint64(out, txn->xact_time.prepare_time);
188 64 : pq_sendint32(out, txn->xid);
189 :
190 : /* send gid */
191 64 : pq_sendstring(out, txn->gid);
192 64 : }
193 :
194 : /*
195 : * Write PREPARE to the output stream.
196 : */
197 : void
198 36 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
199 : XLogRecPtr prepare_lsn)
200 : {
201 36 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
202 : txn, prepare_lsn);
203 36 : }
204 :
205 : /*
206 : * The core functionality for logicalrep_read_prepare and
207 : * logicalrep_read_stream_prepare.
208 : */
209 : static void
210 48 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
211 : LogicalRepPreparedTxnData *prepare_data)
212 : {
213 : /* read flags */
214 48 : uint8 flags = pq_getmsgbyte(in);
215 :
216 48 : if (flags != 0)
217 0 : elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
218 :
219 : /* read fields */
220 48 : prepare_data->prepare_lsn = pq_getmsgint64(in);
221 48 : if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
222 0 : elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
223 48 : prepare_data->end_lsn = pq_getmsgint64(in);
224 48 : if (prepare_data->end_lsn == InvalidXLogRecPtr)
225 0 : elog(ERROR, "end_lsn is not set in %s message", msgtype);
226 48 : prepare_data->prepare_time = pq_getmsgint64(in);
227 48 : prepare_data->xid = pq_getmsgint(in, 4);
228 48 : if (prepare_data->xid == InvalidTransactionId)
229 0 : elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
230 :
231 : /* read gid (copy it into a pre-allocated buffer) */
232 48 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
233 48 : }
234 :
235 : /*
236 : * Read transaction PREPARE from the stream.
237 : */
238 : void
239 26 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
240 : {
241 26 : logicalrep_read_prepare_common(in, "prepare", prepare_data);
242 26 : }
243 :
244 : /*
245 : * Write COMMIT PREPARED to the output stream.
246 : */
247 : void
248 46 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
249 : XLogRecPtr commit_lsn)
250 : {
251 46 : uint8 flags = 0;
252 :
253 46 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
254 :
255 : /*
256 : * This should only ever happen for two-phase commit transactions, in
257 : * which case we expect to have a valid GID.
258 : */
259 : Assert(txn->gid != NULL);
260 :
261 : /* send the flags field */
262 46 : pq_sendbyte(out, flags);
263 :
264 : /* send fields */
265 46 : pq_sendint64(out, commit_lsn);
266 46 : pq_sendint64(out, txn->end_lsn);
267 46 : pq_sendint64(out, txn->xact_time.commit_time);
268 46 : pq_sendint32(out, txn->xid);
269 :
270 : /* send gid */
271 46 : pq_sendstring(out, txn->gid);
272 46 : }
273 :
274 : /*
275 : * Read transaction COMMIT PREPARED from the stream.
276 : */
277 : void
278 38 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
279 : {
280 : /* read flags */
281 38 : uint8 flags = pq_getmsgbyte(in);
282 :
283 38 : if (flags != 0)
284 0 : elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
285 :
286 : /* read fields */
287 38 : prepare_data->commit_lsn = pq_getmsgint64(in);
288 38 : if (prepare_data->commit_lsn == InvalidXLogRecPtr)
289 0 : elog(ERROR, "commit_lsn is not set in commit prepared message");
290 38 : prepare_data->end_lsn = pq_getmsgint64(in);
291 38 : if (prepare_data->end_lsn == InvalidXLogRecPtr)
292 0 : elog(ERROR, "end_lsn is not set in commit prepared message");
293 38 : prepare_data->commit_time = pq_getmsgint64(in);
294 38 : prepare_data->xid = pq_getmsgint(in, 4);
295 :
296 : /* read gid (copy it into a pre-allocated buffer) */
297 38 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
298 38 : }
299 :
300 : /*
301 : * Write ROLLBACK PREPARED to the output stream.
302 : */
303 : void
304 18 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
305 : XLogRecPtr prepare_end_lsn,
306 : TimestampTz prepare_time)
307 : {
308 18 : uint8 flags = 0;
309 :
310 18 : pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
311 :
312 : /*
313 : * This should only ever happen for two-phase commit transactions, in
314 : * which case we expect to have a valid GID.
315 : */
316 : Assert(txn->gid != NULL);
317 :
318 : /* send the flags field */
319 18 : pq_sendbyte(out, flags);
320 :
321 : /* send fields */
322 18 : pq_sendint64(out, prepare_end_lsn);
323 18 : pq_sendint64(out, txn->end_lsn);
324 18 : pq_sendint64(out, prepare_time);
325 18 : pq_sendint64(out, txn->xact_time.commit_time);
326 18 : pq_sendint32(out, txn->xid);
327 :
328 : /* send gid */
329 18 : pq_sendstring(out, txn->gid);
330 18 : }
331 :
332 : /*
333 : * Read transaction ROLLBACK PREPARED from the stream.
334 : */
335 : void
336 10 : logicalrep_read_rollback_prepared(StringInfo in,
337 : LogicalRepRollbackPreparedTxnData *rollback_data)
338 : {
339 : /* read flags */
340 10 : uint8 flags = pq_getmsgbyte(in);
341 :
342 10 : if (flags != 0)
343 0 : elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
344 :
345 : /* read fields */
346 10 : rollback_data->prepare_end_lsn = pq_getmsgint64(in);
347 10 : if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
348 0 : elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
349 10 : rollback_data->rollback_end_lsn = pq_getmsgint64(in);
350 10 : if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
351 0 : elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
352 10 : rollback_data->prepare_time = pq_getmsgint64(in);
353 10 : rollback_data->rollback_time = pq_getmsgint64(in);
354 10 : rollback_data->xid = pq_getmsgint(in, 4);
355 :
356 : /* read gid (copy it into a pre-allocated buffer) */
357 10 : strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
358 10 : }
359 :
360 : /*
361 : * Write STREAM PREPARE to the output stream.
362 : */
363 : void
364 28 : logicalrep_write_stream_prepare(StringInfo out,
365 : ReorderBufferTXN *txn,
366 : XLogRecPtr prepare_lsn)
367 : {
368 28 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
369 : txn, prepare_lsn);
370 28 : }
371 :
372 : /*
373 : * Read STREAM PREPARE from the stream.
374 : */
375 : void
376 22 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
377 : {
378 22 : logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
379 22 : }
380 :
381 : /*
382 : * Write ORIGIN to the output stream.
383 : */
384 : void
385 18 : logicalrep_write_origin(StringInfo out, const char *origin,
386 : XLogRecPtr origin_lsn)
387 : {
388 18 : pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
389 :
390 : /* fixed fields */
391 18 : pq_sendint64(out, origin_lsn);
392 :
393 : /* origin string */
394 18 : pq_sendstring(out, origin);
395 18 : }
396 :
397 : /*
398 : * Read ORIGIN from the output stream.
399 : */
400 : char *
401 0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
402 : {
403 : /* fixed fields */
404 0 : *origin_lsn = pq_getmsgint64(in);
405 :
406 : /* return origin */
407 0 : return pstrdup(pq_getmsgstring(in));
408 : }
409 :
410 : /*
411 : * Write INSERT to the output stream.
412 : */
413 : void
414 211648 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
415 : TupleTableSlot *newslot, bool binary, Bitmapset *columns)
416 : {
417 211648 : pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
418 :
419 : /* transaction ID (if not valid, we're not streaming) */
420 211648 : if (TransactionIdIsValid(xid))
421 200168 : pq_sendint32(out, xid);
422 :
423 : /* use Oid as relation identifier */
424 211648 : pq_sendint32(out, RelationGetRelid(rel));
425 :
426 211648 : pq_sendbyte(out, 'N'); /* new tuple follows */
427 211648 : logicalrep_write_tuple(out, rel, newslot, binary, columns);
428 211648 : }
429 :
430 : /*
431 : * Read INSERT from stream.
432 : *
433 : * Fills the new tuple.
434 : */
435 : LogicalRepRelId
436 152542 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
437 : {
438 : char action;
439 : LogicalRepRelId relid;
440 :
441 : /* read the relation id */
442 152542 : relid = pq_getmsgint(in, 4);
443 :
444 152542 : action = pq_getmsgbyte(in);
445 152542 : if (action != 'N')
446 0 : elog(ERROR, "expected new tuple but got %d",
447 : action);
448 :
449 152542 : logicalrep_read_tuple(in, newtup);
450 :
451 152542 : return relid;
452 : }
453 :
454 : /*
455 : * Write UPDATE to the output stream.
456 : */
457 : void
458 68836 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
459 : TupleTableSlot *oldslot, TupleTableSlot *newslot,
460 : bool binary, Bitmapset *columns)
461 : {
462 68836 : pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
463 :
464 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
465 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
466 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
467 :
468 : /* transaction ID (if not valid, we're not streaming) */
469 68836 : if (TransactionIdIsValid(xid))
470 68464 : pq_sendint32(out, xid);
471 :
472 : /* use Oid as relation identifier */
473 68836 : pq_sendint32(out, RelationGetRelid(rel));
474 :
475 68836 : if (oldslot != NULL)
476 : {
477 222 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
478 102 : pq_sendbyte(out, 'O'); /* old tuple follows */
479 : else
480 120 : pq_sendbyte(out, 'K'); /* old key follows */
481 222 : logicalrep_write_tuple(out, rel, oldslot, binary, columns);
482 : }
483 :
484 68836 : pq_sendbyte(out, 'N'); /* new tuple follows */
485 68836 : logicalrep_write_tuple(out, rel, newslot, binary, columns);
486 68836 : }
487 :
488 : /*
489 : * Read UPDATE from stream.
490 : */
491 : LogicalRepRelId
492 63844 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
493 : LogicalRepTupleData *oldtup,
494 : LogicalRepTupleData *newtup)
495 : {
496 : char action;
497 : LogicalRepRelId relid;
498 :
499 : /* read the relation id */
500 63844 : relid = pq_getmsgint(in, 4);
501 :
502 : /* read and verify action */
503 63844 : action = pq_getmsgbyte(in);
504 63844 : if (action != 'K' && action != 'O' && action != 'N')
505 0 : elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
506 : action);
507 :
508 : /* check for old tuple */
509 63844 : if (action == 'K' || action == 'O')
510 : {
511 246 : logicalrep_read_tuple(in, oldtup);
512 246 : *has_oldtuple = true;
513 :
514 246 : action = pq_getmsgbyte(in);
515 : }
516 : else
517 63598 : *has_oldtuple = false;
518 :
519 : /* check for new tuple */
520 63844 : if (action != 'N')
521 0 : elog(ERROR, "expected action 'N', got %c",
522 : action);
523 :
524 63844 : logicalrep_read_tuple(in, newtup);
525 :
526 63844 : return relid;
527 : }
528 :
529 : /*
530 : * Write DELETE to the output stream.
531 : */
532 : void
533 83720 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
534 : TupleTableSlot *oldslot, bool binary,
535 : Bitmapset *columns)
536 : {
537 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
538 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
539 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
540 :
541 83720 : pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
542 :
543 : /* transaction ID (if not valid, we're not streaming) */
544 83720 : if (TransactionIdIsValid(xid))
545 83250 : pq_sendint32(out, xid);
546 :
547 : /* use Oid as relation identifier */
548 83720 : pq_sendint32(out, RelationGetRelid(rel));
549 :
550 83720 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
551 240 : pq_sendbyte(out, 'O'); /* old tuple follows */
552 : else
553 83480 : pq_sendbyte(out, 'K'); /* old key follows */
554 :
555 83720 : logicalrep_write_tuple(out, rel, oldslot, binary, columns);
556 83720 : }
557 :
558 : /*
559 : * Read DELETE from stream.
560 : *
561 : * Fills the old tuple.
562 : */
563 : LogicalRepRelId
564 80610 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
565 : {
566 : char action;
567 : LogicalRepRelId relid;
568 :
569 : /* read the relation id */
570 80610 : relid = pq_getmsgint(in, 4);
571 :
572 : /* read and verify action */
573 80610 : action = pq_getmsgbyte(in);
574 80610 : if (action != 'K' && action != 'O')
575 0 : elog(ERROR, "expected action 'O' or 'K', got %c", action);
576 :
577 80610 : logicalrep_read_tuple(in, oldtup);
578 :
579 80610 : return relid;
580 : }
581 :
582 : /*
583 : * Write TRUNCATE to the output stream.
584 : */
585 : void
586 16 : logicalrep_write_truncate(StringInfo out,
587 : TransactionId xid,
588 : int nrelids,
589 : Oid relids[],
590 : bool cascade, bool restart_seqs)
591 : {
592 : int i;
593 16 : uint8 flags = 0;
594 :
595 16 : pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
596 :
597 : /* transaction ID (if not valid, we're not streaming) */
598 16 : if (TransactionIdIsValid(xid))
599 0 : pq_sendint32(out, xid);
600 :
601 16 : pq_sendint32(out, nrelids);
602 :
603 : /* encode and send truncate flags */
604 16 : if (cascade)
605 0 : flags |= TRUNCATE_CASCADE;
606 16 : if (restart_seqs)
607 0 : flags |= TRUNCATE_RESTART_SEQS;
608 16 : pq_sendint8(out, flags);
609 :
610 42 : for (i = 0; i < nrelids; i++)
611 26 : pq_sendint32(out, relids[i]);
612 16 : }
613 :
614 : /*
615 : * Read TRUNCATE from stream.
616 : */
617 : List *
618 38 : logicalrep_read_truncate(StringInfo in,
619 : bool *cascade, bool *restart_seqs)
620 : {
621 : int i;
622 : int nrelids;
623 38 : List *relids = NIL;
624 : uint8 flags;
625 :
626 38 : nrelids = pq_getmsgint(in, 4);
627 :
628 : /* read and decode truncate flags */
629 38 : flags = pq_getmsgint(in, 1);
630 38 : *cascade = (flags & TRUNCATE_CASCADE) > 0;
631 38 : *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
632 :
633 94 : for (i = 0; i < nrelids; i++)
634 56 : relids = lappend_oid(relids, pq_getmsgint(in, 4));
635 :
636 38 : return relids;
637 : }
638 :
639 : /*
640 : * Write MESSAGE to stream
641 : */
642 : void
643 10 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
644 : bool transactional, const char *prefix, Size sz,
645 : const char *message)
646 : {
647 10 : uint8 flags = 0;
648 :
649 10 : pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
650 :
651 : /* encode and send message flags */
652 10 : if (transactional)
653 4 : flags |= MESSAGE_TRANSACTIONAL;
654 :
655 : /* transaction ID (if not valid, we're not streaming) */
656 10 : if (TransactionIdIsValid(xid))
657 0 : pq_sendint32(out, xid);
658 :
659 10 : pq_sendint8(out, flags);
660 10 : pq_sendint64(out, lsn);
661 10 : pq_sendstring(out, prefix);
662 10 : pq_sendint32(out, sz);
663 10 : pq_sendbytes(out, message, sz);
664 10 : }
665 :
666 : /*
667 : * Write relation description to the output stream.
668 : */
669 : void
670 618 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
671 : Bitmapset *columns)
672 : {
673 : char *relname;
674 :
675 618 : pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
676 :
677 : /* transaction ID (if not valid, we're not streaming) */
678 618 : if (TransactionIdIsValid(xid))
679 142 : pq_sendint32(out, xid);
680 :
681 : /* use Oid as relation identifier */
682 618 : pq_sendint32(out, RelationGetRelid(rel));
683 :
684 : /* send qualified relation name */
685 618 : logicalrep_write_namespace(out, RelationGetNamespace(rel));
686 618 : relname = RelationGetRelationName(rel);
687 618 : pq_sendstring(out, relname);
688 :
689 : /* send replica identity */
690 618 : pq_sendbyte(out, rel->rd_rel->relreplident);
691 :
692 : /* send the attribute info */
693 618 : logicalrep_write_attrs(out, rel, columns);
694 618 : }
695 :
696 : /*
697 : * Read the relation info from stream and return as LogicalRepRelation.
698 : */
699 : LogicalRepRelation *
700 756 : logicalrep_read_rel(StringInfo in)
701 : {
702 756 : LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
703 :
704 756 : rel->remoteid = pq_getmsgint(in, 4);
705 :
706 : /* Read relation name from stream */
707 756 : rel->nspname = pstrdup(logicalrep_read_namespace(in));
708 756 : rel->relname = pstrdup(pq_getmsgstring(in));
709 :
710 : /* Read the replica identity. */
711 756 : rel->replident = pq_getmsgbyte(in);
712 :
713 : /* Get attribute description */
714 756 : logicalrep_read_attrs(in, rel);
715 :
716 756 : return rel;
717 : }
718 :
719 : /*
720 : * Write type info to the output stream.
721 : *
722 : * This function will always write base type info.
723 : */
724 : void
725 36 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
726 : {
727 36 : Oid basetypoid = getBaseType(typoid);
728 : HeapTuple tup;
729 : Form_pg_type typtup;
730 :
731 36 : pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
732 :
733 : /* transaction ID (if not valid, we're not streaming) */
734 36 : if (TransactionIdIsValid(xid))
735 0 : pq_sendint32(out, xid);
736 :
737 36 : tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
738 36 : if (!HeapTupleIsValid(tup))
739 0 : elog(ERROR, "cache lookup failed for type %u", basetypoid);
740 36 : typtup = (Form_pg_type) GETSTRUCT(tup);
741 :
742 : /* use Oid as relation identifier */
743 36 : pq_sendint32(out, typoid);
744 :
745 : /* send qualified type name */
746 36 : logicalrep_write_namespace(out, typtup->typnamespace);
747 36 : pq_sendstring(out, NameStr(typtup->typname));
748 :
749 36 : ReleaseSysCache(tup);
750 36 : }
751 :
752 : /*
753 : * Read type info from the output stream.
754 : */
755 : void
756 36 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
757 : {
758 36 : ltyp->remoteid = pq_getmsgint(in, 4);
759 :
760 : /* Read type name from stream */
761 36 : ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
762 36 : ltyp->typname = pstrdup(pq_getmsgstring(in));
763 36 : }
764 :
765 : /*
766 : * Write a tuple to the outputstream, in the most efficient format possible.
767 : */
768 : static void
769 364426 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
770 : bool binary, Bitmapset *columns)
771 : {
772 : TupleDesc desc;
773 : Datum *values;
774 : bool *isnull;
775 : int i;
776 364426 : uint16 nliveatts = 0;
777 :
778 364426 : desc = RelationGetDescr(rel);
779 :
780 1096160 : for (i = 0; i < desc->natts; i++)
781 : {
782 731734 : Form_pg_attribute att = TupleDescAttr(desc, i);
783 :
784 731734 : if (att->attisdropped || att->attgenerated)
785 16 : continue;
786 :
787 731718 : if (!column_in_column_list(att->attnum, columns))
788 204 : continue;
789 :
790 731514 : nliveatts++;
791 : }
792 364426 : pq_sendint16(out, nliveatts);
793 :
794 364426 : slot_getallattrs(slot);
795 364426 : values = slot->tts_values;
796 364426 : isnull = slot->tts_isnull;
797 :
798 : /* Write the values */
799 1096160 : for (i = 0; i < desc->natts; i++)
800 : {
801 : HeapTuple typtup;
802 : Form_pg_type typclass;
803 731734 : Form_pg_attribute att = TupleDescAttr(desc, i);
804 :
805 731734 : if (att->attisdropped || att->attgenerated)
806 16 : continue;
807 :
808 731718 : if (!column_in_column_list(att->attnum, columns))
809 204 : continue;
810 :
811 731514 : if (isnull[i])
812 : {
813 103746 : pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
814 103746 : continue;
815 : }
816 :
817 627768 : if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
818 : {
819 : /*
820 : * Unchanged toasted datum. (Note that we don't promise to detect
821 : * unchanged data in general; this is just a cheap check to avoid
822 : * sending large values unnecessarily.)
823 : */
824 6 : pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
825 6 : continue;
826 : }
827 :
828 627762 : typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
829 627762 : if (!HeapTupleIsValid(typtup))
830 0 : elog(ERROR, "cache lookup failed for type %u", att->atttypid);
831 627762 : typclass = (Form_pg_type) GETSTRUCT(typtup);
832 :
833 : /*
834 : * Send in binary if requested and type has suitable send function.
835 : */
836 627762 : if (binary && OidIsValid(typclass->typsend))
837 230086 : {
838 : bytea *outputbytes;
839 : int len;
840 :
841 230086 : pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
842 230086 : outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
843 230086 : len = VARSIZE(outputbytes) - VARHDRSZ;
844 230086 : pq_sendint(out, len, 4); /* length */
845 230086 : pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
846 230086 : pfree(outputbytes);
847 : }
848 : else
849 : {
850 : char *outputstr;
851 :
852 397676 : pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
853 397676 : outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
854 397676 : pq_sendcountedtext(out, outputstr, strlen(outputstr));
855 397676 : pfree(outputstr);
856 : }
857 :
858 627762 : ReleaseSysCache(typtup);
859 : }
860 364426 : }
861 :
862 : /*
863 : * Read tuple in logical replication format from stream.
864 : */
865 : static void
866 297242 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
867 : {
868 : int i;
869 : int natts;
870 :
871 : /* Get number of attributes */
872 297242 : natts = pq_getmsgint(in, 2);
873 :
874 : /* Allocate space for per-column values; zero out unused StringInfoDatas */
875 297242 : tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
876 297242 : tuple->colstatus = (char *) palloc(natts * sizeof(char));
877 297242 : tuple->ncols = natts;
878 :
879 : /* Read the data */
880 903512 : for (i = 0; i < natts; i++)
881 : {
882 : char *buff;
883 : char kind;
884 : int len;
885 606270 : StringInfo value = &tuple->colvalues[i];
886 :
887 606270 : kind = pq_getmsgbyte(in);
888 606270 : tuple->colstatus[i] = kind;
889 :
890 606270 : switch (kind)
891 : {
892 100712 : case LOGICALREP_COLUMN_NULL:
893 : /* nothing more to do */
894 100712 : break;
895 6 : case LOGICALREP_COLUMN_UNCHANGED:
896 : /* we don't receive the value of an unchanged column */
897 6 : break;
898 505552 : case LOGICALREP_COLUMN_TEXT:
899 : case LOGICALREP_COLUMN_BINARY:
900 505552 : len = pq_getmsgint(in, 4); /* read length */
901 :
902 : /* and data */
903 505552 : buff = palloc(len + 1);
904 505552 : pq_copymsgbytes(in, buff, len);
905 :
906 : /*
907 : * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
908 : * as input functions require that. For
909 : * LOGICALREP_COLUMN_BINARY it's not technically required, but
910 : * it's harmless.
911 : */
912 505552 : buff[len] = '\0';
913 :
914 505552 : initStringInfoFromString(value, buff, len);
915 505552 : break;
916 0 : default:
917 0 : elog(ERROR, "unrecognized data representation type '%c'", kind);
918 : }
919 : }
920 297242 : }
921 :
922 : /*
923 : * Write relation attribute metadata to the stream.
924 : */
925 : static void
926 618 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
927 : {
928 : TupleDesc desc;
929 : int i;
930 618 : uint16 nliveatts = 0;
931 618 : Bitmapset *idattrs = NULL;
932 : bool replidentfull;
933 :
934 618 : desc = RelationGetDescr(rel);
935 :
936 : /* send number of live attributes */
937 1844 : for (i = 0; i < desc->natts; i++)
938 : {
939 1226 : Form_pg_attribute att = TupleDescAttr(desc, i);
940 :
941 1226 : if (att->attisdropped || att->attgenerated)
942 10 : continue;
943 :
944 1216 : if (!column_in_column_list(att->attnum, columns))
945 106 : continue;
946 :
947 1110 : nliveatts++;
948 : }
949 618 : pq_sendint16(out, nliveatts);
950 :
951 : /* fetch bitmap of REPLICATION IDENTITY attributes */
952 618 : replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
953 618 : if (!replidentfull)
954 540 : idattrs = RelationGetIdentityKeyBitmap(rel);
955 :
956 : /* send the attributes */
957 1844 : for (i = 0; i < desc->natts; i++)
958 : {
959 1226 : Form_pg_attribute att = TupleDescAttr(desc, i);
960 1226 : uint8 flags = 0;
961 :
962 1226 : if (att->attisdropped || att->attgenerated)
963 10 : continue;
964 :
965 1216 : if (!column_in_column_list(att->attnum, columns))
966 106 : continue;
967 :
968 : /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
969 2112 : if (replidentfull ||
970 1002 : bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
971 : idattrs))
972 526 : flags |= LOGICALREP_IS_REPLICA_IDENTITY;
973 :
974 1110 : pq_sendbyte(out, flags);
975 :
976 : /* attribute name */
977 1110 : pq_sendstring(out, NameStr(att->attname));
978 :
979 : /* attribute type id */
980 1110 : pq_sendint32(out, (int) att->atttypid);
981 :
982 : /* attribute mode */
983 1110 : pq_sendint32(out, att->atttypmod);
984 : }
985 :
986 618 : bms_free(idattrs);
987 618 : }
988 :
989 : /*
990 : * Read relation attribute metadata from the stream.
991 : */
992 : static void
993 756 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
994 : {
995 : int i;
996 : int natts;
997 : char **attnames;
998 : Oid *atttyps;
999 756 : Bitmapset *attkeys = NULL;
1000 :
1001 756 : natts = pq_getmsgint(in, 2);
1002 756 : attnames = palloc(natts * sizeof(char *));
1003 756 : atttyps = palloc(natts * sizeof(Oid));
1004 :
1005 : /* read the attributes */
1006 2110 : for (i = 0; i < natts; i++)
1007 : {
1008 : uint8 flags;
1009 :
1010 : /* Check for replica identity column */
1011 1354 : flags = pq_getmsgbyte(in);
1012 1354 : if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
1013 634 : attkeys = bms_add_member(attkeys, i);
1014 :
1015 : /* attribute name */
1016 1354 : attnames[i] = pstrdup(pq_getmsgstring(in));
1017 :
1018 : /* attribute type id */
1019 1354 : atttyps[i] = (Oid) pq_getmsgint(in, 4);
1020 :
1021 : /* we ignore attribute mode for now */
1022 1354 : (void) pq_getmsgint(in, 4);
1023 : }
1024 :
1025 756 : rel->attnames = attnames;
1026 756 : rel->atttyps = atttyps;
1027 756 : rel->attkeys = attkeys;
1028 756 : rel->natts = natts;
1029 756 : }
1030 :
1031 : /*
1032 : * Write the namespace name or empty string for pg_catalog (to save space).
1033 : */
1034 : static void
1035 654 : logicalrep_write_namespace(StringInfo out, Oid nspid)
1036 : {
1037 654 : if (nspid == PG_CATALOG_NAMESPACE)
1038 2 : pq_sendbyte(out, '\0');
1039 : else
1040 : {
1041 652 : char *nspname = get_namespace_name(nspid);
1042 :
1043 652 : if (nspname == NULL)
1044 0 : elog(ERROR, "cache lookup failed for namespace %u",
1045 : nspid);
1046 :
1047 652 : pq_sendstring(out, nspname);
1048 : }
1049 654 : }
1050 :
1051 : /*
1052 : * Read the namespace name while treating empty string as pg_catalog.
1053 : */
1054 : static const char *
1055 792 : logicalrep_read_namespace(StringInfo in)
1056 : {
1057 792 : const char *nspname = pq_getmsgstring(in);
1058 :
1059 792 : if (nspname[0] == '\0')
1060 2 : nspname = "pg_catalog";
1061 :
1062 792 : return nspname;
1063 : }
1064 :
1065 : /*
1066 : * Write the information for the start stream message to the output stream.
1067 : */
1068 : void
1069 1274 : logicalrep_write_stream_start(StringInfo out,
1070 : TransactionId xid, bool first_segment)
1071 : {
1072 1274 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
1073 :
1074 : Assert(TransactionIdIsValid(xid));
1075 :
1076 : /* transaction ID (we're starting to stream, so must be valid) */
1077 1274 : pq_sendint32(out, xid);
1078 :
1079 : /* 1 if this is the first streaming segment for this xid */
1080 1274 : pq_sendbyte(out, first_segment ? 1 : 0);
1081 1274 : }
1082 :
1083 : /*
1084 : * Read the information about the start stream message from output stream.
1085 : */
1086 : TransactionId
1087 1676 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
1088 : {
1089 : TransactionId xid;
1090 :
1091 : Assert(first_segment);
1092 :
1093 1676 : xid = pq_getmsgint(in, 4);
1094 1676 : *first_segment = (pq_getmsgbyte(in) == 1);
1095 :
1096 1676 : return xid;
1097 : }
1098 :
1099 : /*
1100 : * Write the stop stream message to the output stream.
1101 : */
1102 : void
1103 1274 : logicalrep_write_stream_stop(StringInfo out)
1104 : {
1105 1274 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
1106 1274 : }
1107 :
1108 : /*
1109 : * Write STREAM COMMIT to the output stream.
1110 : */
1111 : void
1112 92 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
1113 : XLogRecPtr commit_lsn)
1114 : {
1115 92 : uint8 flags = 0;
1116 :
1117 92 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
1118 :
1119 : Assert(TransactionIdIsValid(txn->xid));
1120 :
1121 : /* transaction ID */
1122 92 : pq_sendint32(out, txn->xid);
1123 :
1124 : /* send the flags field (unused for now) */
1125 92 : pq_sendbyte(out, flags);
1126 :
1127 : /* send fields */
1128 92 : pq_sendint64(out, commit_lsn);
1129 92 : pq_sendint64(out, txn->end_lsn);
1130 92 : pq_sendint64(out, txn->xact_time.commit_time);
1131 92 : }
1132 :
1133 : /*
1134 : * Read STREAM COMMIT from the output stream.
1135 : */
1136 : TransactionId
1137 122 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
1138 : {
1139 : TransactionId xid;
1140 : uint8 flags;
1141 :
1142 122 : xid = pq_getmsgint(in, 4);
1143 :
1144 : /* read flags (unused for now) */
1145 122 : flags = pq_getmsgbyte(in);
1146 :
1147 122 : if (flags != 0)
1148 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
1149 :
1150 : /* read fields */
1151 122 : commit_data->commit_lsn = pq_getmsgint64(in);
1152 122 : commit_data->end_lsn = pq_getmsgint64(in);
1153 122 : commit_data->committime = pq_getmsgint64(in);
1154 :
1155 122 : return xid;
1156 : }
1157 :
1158 : /*
1159 : * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1160 : * same for the top-level transaction abort.
1161 : *
1162 : * If write_abort_info is true, send the abort_lsn and abort_time fields,
1163 : * otherwise don't.
1164 : */
1165 : void
1166 52 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
1167 : TransactionId subxid, XLogRecPtr abort_lsn,
1168 : TimestampTz abort_time, bool write_abort_info)
1169 : {
1170 52 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
1171 :
1172 : Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
1173 :
1174 : /* transaction ID */
1175 52 : pq_sendint32(out, xid);
1176 52 : pq_sendint32(out, subxid);
1177 :
1178 52 : if (write_abort_info)
1179 : {
1180 24 : pq_sendint64(out, abort_lsn);
1181 24 : pq_sendint64(out, abort_time);
1182 : }
1183 52 : }
1184 :
1185 : /*
1186 : * Read STREAM ABORT from the output stream.
1187 : *
1188 : * If read_abort_info is true, read the abort_lsn and abort_time fields,
1189 : * otherwise don't.
1190 : */
1191 : void
1192 76 : logicalrep_read_stream_abort(StringInfo in,
1193 : LogicalRepStreamAbortData *abort_data,
1194 : bool read_abort_info)
1195 : {
1196 : Assert(abort_data);
1197 :
1198 76 : abort_data->xid = pq_getmsgint(in, 4);
1199 76 : abort_data->subxid = pq_getmsgint(in, 4);
1200 :
1201 76 : if (read_abort_info)
1202 : {
1203 48 : abort_data->abort_lsn = pq_getmsgint64(in);
1204 48 : abort_data->abort_time = pq_getmsgint64(in);
1205 : }
1206 : else
1207 : {
1208 28 : abort_data->abort_lsn = InvalidXLogRecPtr;
1209 28 : abort_data->abort_time = 0;
1210 : }
1211 76 : }
1212 :
1213 : /*
1214 : * Get string representing LogicalRepMsgType.
1215 : */
1216 : const char *
1217 678 : logicalrep_message_type(LogicalRepMsgType action)
1218 : {
1219 : static char err_unknown[20];
1220 :
1221 678 : switch (action)
1222 : {
1223 2 : case LOGICAL_REP_MSG_BEGIN:
1224 2 : return "BEGIN";
1225 2 : case LOGICAL_REP_MSG_COMMIT:
1226 2 : return "COMMIT";
1227 0 : case LOGICAL_REP_MSG_ORIGIN:
1228 0 : return "ORIGIN";
1229 74 : case LOGICAL_REP_MSG_INSERT:
1230 74 : return "INSERT";
1231 18 : case LOGICAL_REP_MSG_UPDATE:
1232 18 : return "UPDATE";
1233 10 : case LOGICAL_REP_MSG_DELETE:
1234 10 : return "DELETE";
1235 0 : case LOGICAL_REP_MSG_TRUNCATE:
1236 0 : return "TRUNCATE";
1237 4 : case LOGICAL_REP_MSG_RELATION:
1238 4 : return "RELATION";
1239 0 : case LOGICAL_REP_MSG_TYPE:
1240 0 : return "TYPE";
1241 0 : case LOGICAL_REP_MSG_MESSAGE:
1242 0 : return "MESSAGE";
1243 2 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
1244 2 : return "BEGIN PREPARE";
1245 2 : case LOGICAL_REP_MSG_PREPARE:
1246 2 : return "PREPARE";
1247 0 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
1248 0 : return "COMMIT PREPARED";
1249 0 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
1250 0 : return "ROLLBACK PREPARED";
1251 26 : case LOGICAL_REP_MSG_STREAM_START:
1252 26 : return "STREAM START";
1253 456 : case LOGICAL_REP_MSG_STREAM_STOP:
1254 456 : return "STREAM STOP";
1255 40 : case LOGICAL_REP_MSG_STREAM_COMMIT:
1256 40 : return "STREAM COMMIT";
1257 38 : case LOGICAL_REP_MSG_STREAM_ABORT:
1258 38 : return "STREAM ABORT";
1259 4 : case LOGICAL_REP_MSG_STREAM_PREPARE:
1260 4 : return "STREAM PREPARE";
1261 : }
1262 :
1263 : /*
1264 : * This message provides context in the error raised when applying a
1265 : * logical message. So we can't throw an error here. Return an unknown
1266 : * indicator value so that the original error is still reported.
1267 : */
1268 0 : snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1269 :
1270 0 : return err_unknown;
1271 : }
|