Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * logicalfuncs.c
4 : *
5 : * Support functions for using logical decoding and management of
6 : * logical replication slots via SQL.
7 : *
8 : *
9 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
10 : *
11 : * IDENTIFICATION
12 : * src/backend/replication/logical/logicalfuncs.c
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include <unistd.h>
19 :
20 : #include "access/xlogrecovery.h"
21 : #include "access/xlogutils.h"
22 : #include "catalog/pg_type.h"
23 : #include "fmgr.h"
24 : #include "funcapi.h"
25 : #include "mb/pg_wchar.h"
26 : #include "miscadmin.h"
27 : #include "nodes/makefuncs.h"
28 : #include "replication/decode.h"
29 : #include "replication/logical.h"
30 : #include "replication/message.h"
31 : #include "utils/array.h"
32 : #include "utils/builtins.h"
33 : #include "utils/inval.h"
34 : #include "utils/memutils.h"
35 : #include "utils/pg_lsn.h"
36 : #include "utils/regproc.h"
37 : #include "utils/resowner.h"
38 :
39 : /* Private data for writing out data */
40 : typedef struct DecodingOutputState
41 : {
42 : Tuplestorestate *tupstore;
43 : TupleDesc tupdesc;
44 : bool binary_output;
45 : int64 returned_rows;
46 : } DecodingOutputState;
47 :
48 : /*
49 : * Prepare for an output plugin write.
50 : */
51 : static void
52 302404 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
53 : bool last_write)
54 : {
55 302404 : resetStringInfo(ctx->out);
56 302404 : }
57 :
58 : /*
59 : * Perform output plugin write into tuplestore.
60 : */
61 : static void
62 302404 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
63 : bool last_write)
64 : {
65 : Datum values[3];
66 : bool nulls[3];
67 : DecodingOutputState *p;
68 :
69 : /* SQL Datums can only be of a limited length... */
70 302404 : if (ctx->out->len > MaxAllocSize - VARHDRSZ)
71 0 : elog(ERROR, "too much output for sql interface");
72 :
73 302404 : p = (DecodingOutputState *) ctx->output_writer_private;
74 :
75 302404 : memset(nulls, 0, sizeof(nulls));
76 302404 : values[0] = LSNGetDatum(lsn);
77 302404 : values[1] = TransactionIdGetDatum(xid);
78 :
79 : /*
80 : * Assert ctx->out is in database encoding when we're writing textual
81 : * output.
82 : */
83 302404 : if (!p->binary_output)
84 : Assert(pg_verify_mbstr(GetDatabaseEncoding(),
85 : ctx->out->data, ctx->out->len,
86 : false));
87 :
88 : /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
89 302404 : values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
90 :
91 302404 : tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
92 302404 : p->returned_rows++;
93 302404 : }
94 :
95 : /*
96 : * Helper function for the various SQL callable logical decoding functions.
97 : */
98 : static Datum
99 424 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
100 : {
101 : Name name;
102 : XLogRecPtr upto_lsn;
103 : int32 upto_nchanges;
104 424 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
105 : MemoryContext per_query_ctx;
106 : MemoryContext oldcontext;
107 : XLogRecPtr end_of_wal;
108 : XLogRecPtr wait_for_wal_lsn;
109 : LogicalDecodingContext *ctx;
110 424 : ResourceOwner old_resowner = CurrentResourceOwner;
111 : ArrayType *arr;
112 : Size ndim;
113 424 : List *options = NIL;
114 : DecodingOutputState *p;
115 :
116 424 : CheckSlotPermissions();
117 :
118 422 : CheckLogicalDecodingRequirements();
119 :
120 422 : if (PG_ARGISNULL(0))
121 0 : ereport(ERROR,
122 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
123 : errmsg("slot name must not be null")));
124 422 : name = PG_GETARG_NAME(0);
125 :
126 422 : if (PG_ARGISNULL(1))
127 422 : upto_lsn = InvalidXLogRecPtr;
128 : else
129 0 : upto_lsn = PG_GETARG_LSN(1);
130 :
131 422 : if (PG_ARGISNULL(2))
132 422 : upto_nchanges = InvalidXLogRecPtr;
133 : else
134 0 : upto_nchanges = PG_GETARG_INT32(2);
135 :
136 422 : if (PG_ARGISNULL(3))
137 0 : ereport(ERROR,
138 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
139 : errmsg("options array must not be null")));
140 422 : arr = PG_GETARG_ARRAYTYPE_P(3);
141 :
142 : /* state to write output to */
143 422 : p = palloc0(sizeof(DecodingOutputState));
144 :
145 422 : p->binary_output = binary;
146 :
147 422 : per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
148 422 : oldcontext = MemoryContextSwitchTo(per_query_ctx);
149 :
150 : /* Deconstruct options array */
151 422 : ndim = ARR_NDIM(arr);
152 422 : if (ndim > 1)
153 : {
154 0 : ereport(ERROR,
155 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
156 : errmsg("array must be one-dimensional")));
157 : }
158 422 : else if (array_contains_nulls(arr))
159 : {
160 0 : ereport(ERROR,
161 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
162 : errmsg("array must not contain nulls")));
163 : }
164 422 : else if (ndim == 1)
165 : {
166 : int nelems;
167 : Datum *datum_opts;
168 : int i;
169 :
170 : Assert(ARR_ELEMTYPE(arr) == TEXTOID);
171 :
172 368 : deconstruct_array_builtin(arr, TEXTOID, &datum_opts, NULL, &nelems);
173 :
174 368 : if (nelems % 2 != 0)
175 0 : ereport(ERROR,
176 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
177 : errmsg("array must have even number of elements")));
178 :
179 1116 : for (i = 0; i < nelems; i += 2)
180 : {
181 748 : char *optname = TextDatumGetCString(datum_opts[i]);
182 748 : char *opt = TextDatumGetCString(datum_opts[i + 1]);
183 :
184 748 : options = lappend(options, makeDefElem(optname, (Node *) makeString(opt), -1));
185 : }
186 : }
187 :
188 422 : InitMaterializedSRF(fcinfo, 0);
189 422 : p->tupstore = rsinfo->setResult;
190 422 : p->tupdesc = rsinfo->setDesc;
191 :
192 : /*
193 : * Compute the current end-of-wal.
194 : */
195 422 : if (!RecoveryInProgress())
196 410 : end_of_wal = GetFlushRecPtr(NULL);
197 : else
198 12 : end_of_wal = GetXLogReplayRecPtr(NULL);
199 :
200 422 : ReplicationSlotAcquire(NameStr(*name), true);
201 :
202 420 : PG_TRY();
203 : {
204 : /* restart at slot's confirmed_flush */
205 826 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
206 : options,
207 : false,
208 420 : XL_ROUTINE(.page_read = read_local_xlog_page,
209 : .segment_open = wal_segment_open,
210 : .segment_close = wal_segment_close),
211 : LogicalOutputPrepareWrite,
212 : LogicalOutputWrite, NULL);
213 :
214 406 : MemoryContextSwitchTo(oldcontext);
215 :
216 : /*
217 : * Check whether the output plugin writes textual output if that's
218 : * what we need.
219 : */
220 406 : if (!binary &&
221 382 : ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
222 2 : ereport(ERROR,
223 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
224 : errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
225 : NameStr(MyReplicationSlot->data.plugin),
226 : format_procedure(fcinfo->flinfo->fn_oid))));
227 :
228 : /*
229 : * Wait for specified streaming replication standby servers (if any)
230 : * to confirm receipt of WAL up to wait_for_wal_lsn.
231 : */
232 404 : if (XLogRecPtrIsInvalid(upto_lsn))
233 404 : wait_for_wal_lsn = end_of_wal;
234 : else
235 0 : wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
236 :
237 404 : WaitForStandbyConfirmation(wait_for_wal_lsn);
238 :
239 404 : ctx->output_writer_private = p;
240 :
241 : /*
242 : * Decoding of WAL must start at restart_lsn so that the entirety of
243 : * xacts that committed after the slot's confirmed_flush can be
244 : * accumulated into reorder buffers.
245 : */
246 404 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
247 :
248 : /* invalidate non-timetravel entries */
249 404 : InvalidateSystemCaches();
250 :
251 : /* Decode until we run out of records */
252 3297790 : while (ctx->reader->EndRecPtr < end_of_wal)
253 : {
254 : XLogRecord *record;
255 3297386 : char *errm = NULL;
256 :
257 3297386 : record = XLogReadRecord(ctx->reader, &errm);
258 3297386 : if (errm)
259 0 : elog(ERROR, "could not find record for logical decoding: %s", errm);
260 :
261 : /*
262 : * The {begin_txn,change,commit_txn}_wrapper callbacks above will
263 : * store the description into our tuplestore.
264 : */
265 3297386 : if (record != NULL)
266 3297386 : LogicalDecodingProcessRecord(ctx, ctx->reader);
267 :
268 : /* check limits */
269 3297386 : if (upto_lsn != InvalidXLogRecPtr &&
270 0 : upto_lsn <= ctx->reader->EndRecPtr)
271 0 : break;
272 3297386 : if (upto_nchanges != 0 &&
273 0 : upto_nchanges <= p->returned_rows)
274 0 : break;
275 3297386 : CHECK_FOR_INTERRUPTS();
276 : }
277 :
278 : /*
279 : * Logical decoding could have clobbered CurrentResourceOwner during
280 : * transaction management, so restore the executor's value. (This is
281 : * a kluge, but it's not worth cleaning up right now.)
282 : */
283 404 : CurrentResourceOwner = old_resowner;
284 :
285 : /*
286 : * Next time, start where we left off. (Hunting things, the family
287 : * business..)
288 : */
289 404 : if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
290 : {
291 362 : LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
292 :
293 : /*
294 : * If only the confirmed_flush_lsn has changed the slot won't get
295 : * marked as dirty by the above. Callers on the walsender
296 : * interface are expected to keep track of their own progress and
297 : * don't need it written out. But SQL-interface users cannot
298 : * specify their own start positions and it's harder for them to
299 : * keep track of their progress, so we should make more of an
300 : * effort to save it for them.
301 : *
302 : * Dirty the slot so it's written out at the next checkpoint.
303 : * We'll still lose its position on crash, as documented, but it's
304 : * better than always losing the position even on clean restart.
305 : */
306 362 : ReplicationSlotMarkDirty();
307 : }
308 :
309 : /* free context, call shutdown callback */
310 404 : FreeDecodingContext(ctx);
311 :
312 404 : ReplicationSlotRelease();
313 404 : InvalidateSystemCaches();
314 : }
315 16 : PG_CATCH();
316 : {
317 : /* clear all timetravel entries */
318 16 : InvalidateSystemCaches();
319 :
320 16 : PG_RE_THROW();
321 : }
322 404 : PG_END_TRY();
323 :
324 404 : return (Datum) 0;
325 : }
326 :
327 : /*
328 : * SQL function returning the changestream as text, consuming the data.
329 : */
330 : Datum
331 368 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
332 : {
333 368 : return pg_logical_slot_get_changes_guts(fcinfo, true, false);
334 : }
335 :
336 : /*
337 : * SQL function returning the changestream as text, only peeking ahead.
338 : */
339 : Datum
340 32 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
341 : {
342 32 : return pg_logical_slot_get_changes_guts(fcinfo, false, false);
343 : }
344 :
345 : /*
346 : * SQL function returning the changestream in binary, consuming the data.
347 : */
348 : Datum
349 8 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
350 : {
351 8 : return pg_logical_slot_get_changes_guts(fcinfo, true, true);
352 : }
353 :
354 : /*
355 : * SQL function returning the changestream in binary, only peeking ahead.
356 : */
357 : Datum
358 16 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
359 : {
360 16 : return pg_logical_slot_get_changes_guts(fcinfo, false, true);
361 : }
362 :
363 :
364 : /*
365 : * SQL function for writing logical decoding message into WAL.
366 : */
367 : Datum
368 228 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
369 : {
370 228 : bool transactional = PG_GETARG_BOOL(0);
371 228 : char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
372 228 : bytea *data = PG_GETARG_BYTEA_PP(2);
373 228 : bool flush = PG_GETARG_BOOL(3);
374 : XLogRecPtr lsn;
375 :
376 228 : lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
377 : transactional, flush);
378 228 : PG_RETURN_LSN(lsn);
379 : }
380 :
381 : Datum
382 228 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
383 : {
384 : /* bytea and text are compatible */
385 228 : return pg_logical_emit_message_bytea(fcinfo);
386 : }
|