Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pgrepack.c
4 : : * Logical Replication output plugin for REPACK command
5 : : *
6 : : * Copyright (c) 2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/pgrepack/pgrepack.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/detoast.h"
16 : : #include "commands/repack.h"
17 : : #include "commands/repack_internal.h"
18 : : #include "replication/snapbuild.h"
19 : : #include "utils/memutils.h"
20 : :
85 alvherre@kurilemu.de 21 :GNC 8 : PG_MODULE_MAGIC;
22 : :
23 : : static void repack_startup(LogicalDecodingContext *ctx,
24 : : OutputPluginOptions *opt, bool is_init);
25 : : static void repack_shutdown(LogicalDecodingContext *ctx);
26 : : static void repack_begin_txn(LogicalDecodingContext *ctx,
27 : : ReorderBufferTXN *txn);
28 : : static void repack_commit_txn(LogicalDecodingContext *ctx,
29 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
30 : : static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
31 : : Relation relation, ReorderBufferChange *change);
32 : : static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
33 : : ConcurrentChangeKind kind, HeapTuple tuple);
34 : :
35 : : void
36 : 8 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
37 : : {
38 : 8 : cb->startup_cb = repack_startup;
39 : 8 : cb->begin_cb = repack_begin_txn;
40 : 8 : cb->change_cb = repack_process_change;
41 : 8 : cb->commit_cb = repack_commit_txn;
42 : 8 : cb->shutdown_cb = repack_shutdown;
43 : 8 : }
44 : :
45 : :
46 : : /* initialize this plugin */
47 : : static void
48 : 8 : repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
49 : : bool is_init)
50 : : {
51 : : RepackDecodingState *dstate;
52 : :
21 53 [ + + ]: 8 : if (!AmRepackWorker())
54 [ + - ]: 1 : ereport(ERROR,
55 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
56 : : errmsg("unsupported use of logical decoding plugin \"%s\"",
57 : : "pgrepack"),
58 : : errdetail("This plugin can only be used by %s.",
59 : : "REPACK (CONCURRENTLY)"));
60 : :
61 : : /* Initial setup of our private state */
62 [ - + ]: 7 : Assert(CurrentMemoryContext == ctx->context);
63 : 7 : dstate = palloc0_object(RepackDecodingState);
64 : 7 : dstate->change_cxt = AllocSetContextCreate(ctx->context,
65 : : "REPACK - change",
66 : : ALLOCSET_DEFAULT_SIZES);
67 : : /* repack_setup_logical_decoding fills in the rest */
68 : 7 : ctx->output_writer_private = dstate;
69 : :
70 : : /* Probably unnecessary, as we don't use the SQL interface ... */
85 71 : 7 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
72 : :
73 [ - + ]: 7 : if (ctx->output_plugin_options != NIL)
74 : : {
85 alvherre@kurilemu.de 75 [ # # ]:UNC 0 : ereport(ERROR,
76 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
77 : : errmsg("this plugin does not expect any options"));
78 : : }
85 alvherre@kurilemu.de 79 :GNC 7 : }
80 : :
81 : : static void
82 : 7 : repack_shutdown(LogicalDecodingContext *ctx)
83 : : {
84 : 7 : }
85 : :
86 : : /*
87 : : * As we don't release the slot during processing of particular table, there's
88 : : * no room for SQL interface, even for debugging purposes. Therefore we need
89 : : * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
90 : : * callbacks. (Although we might want to write custom callbacks, this API
91 : : * seems to be unnecessarily generic for our purposes.)
92 : : */
93 : :
94 : : /* BEGIN callback */
95 : : static void
96 : 11 : repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
97 : : {
98 : 11 : }
99 : :
100 : : /* COMMIT callback */
101 : : static void
102 : 11 : repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
103 : : XLogRecPtr commit_lsn)
104 : : {
105 : 11 : }
106 : :
107 : : /*
108 : : * Callback for individual changed tuples
109 : : */
110 : : static void
111 : 32 : repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
112 : : Relation relation, ReorderBufferChange *change)
113 : : {
114 : 32 : RepackDecodingState *private PG_USED_FOR_ASSERTS_ONLY =
115 : : (RepackDecodingState *) ctx->output_writer_private;
116 : :
117 : : /* Changes of other relation should not have been decoded. */
118 [ - + ]: 32 : Assert(RelationGetRelid(relation) == private->relid);
119 : :
120 : : /* Decode entry depending on its type */
121 [ + + + - ]: 32 : switch (change->action)
122 : : {
123 : 7 : case REORDER_BUFFER_CHANGE_INSERT:
124 : : {
125 : : HeapTuple newtuple;
126 : :
127 : 7 : newtuple = change->data.tp.newtuple;
128 : :
129 : : /*
130 : : * Identity checks in the main function should have made this
131 : : * impossible.
132 : : */
133 [ - + ]: 7 : if (newtuple == NULL)
85 alvherre@kurilemu.de 134 [ # # ]:UNC 0 : elog(ERROR, "incomplete insert info");
135 : :
85 alvherre@kurilemu.de 136 :GNC 7 : repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
137 : : }
138 : 7 : break;
139 : 22 : case REORDER_BUFFER_CHANGE_UPDATE:
140 : : {
141 : : HeapTuple oldtuple,
142 : : newtuple;
143 : :
144 : 22 : oldtuple = change->data.tp.oldtuple;
145 : 22 : newtuple = change->data.tp.newtuple;
146 : :
147 [ - + ]: 22 : if (newtuple == NULL)
85 alvherre@kurilemu.de 148 [ # # ]:UNC 0 : elog(ERROR, "incomplete update info");
149 : :
85 alvherre@kurilemu.de 150 [ + + ]:GNC 22 : if (oldtuple != NULL)
151 : 8 : repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
152 : :
153 : 22 : repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
154 : : }
155 : 22 : break;
156 : 3 : case REORDER_BUFFER_CHANGE_DELETE:
157 : : {
158 : : HeapTuple oldtuple;
159 : :
160 : 3 : oldtuple = change->data.tp.oldtuple;
161 : :
162 [ - + ]: 3 : if (oldtuple == NULL)
85 alvherre@kurilemu.de 163 [ # # ]:UNC 0 : elog(ERROR, "incomplete delete info");
164 : :
85 alvherre@kurilemu.de 165 :GNC 3 : repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
166 : : }
167 : 3 : break;
85 alvherre@kurilemu.de 168 :UNC 0 : default:
169 : :
170 : : /*
171 : : * Should not come here. This includes TRUNCATE of the table being
172 : : * processed. heap_decode() cannot check the file locator easily,
173 : : * but we assume that TRUNCATE uses AccessExclusiveLock on the
174 : : * table so it should not occur during REPACK (CONCURRENTLY).
175 : : */
176 : 0 : Assert(false);
177 : : break;
178 : : }
85 alvherre@kurilemu.de 179 :GNC 32 : }
180 : :
181 : : /*
182 : : * Write the given tuple, with the given change kind, to the repack spill
183 : : * file. Later, the repack decoding worker can read these and replay
184 : : * the operations on the new copy of the table.
185 : : *
186 : : * For each change affecting the table being repacked, we store enough
187 : : * information about each tuple in it, so that it can be replayed in the
188 : : * new copy of the table.
189 : : */
190 : : static void
191 : 40 : repack_store_change(LogicalDecodingContext *ctx, Relation relation,
192 : : ConcurrentChangeKind kind, HeapTuple tuple)
193 : : {
194 : : RepackDecodingState *dstate;
195 : : MemoryContext oldcxt;
196 : : BufFile *file;
197 : 40 : List *attrs_ext = NIL;
198 : : int natt_ext;
199 : :
200 : 40 : dstate = (RepackDecodingState *) ctx->output_writer_private;
201 : 40 : file = dstate->file;
202 : :
203 : : /* Store the change kind. */
204 : 40 : BufFileWrite(file, &kind, 1);
205 : :
206 : : /* Use a frequently-reset context to avoid dealing with leaks manually */
207 : 40 : oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
208 : :
209 : : /*
210 : : * If the tuple contains "external indirect" attributes, we need to write
211 : : * the contents to the file because we have no control over that memory.
212 : : */
213 [ + + ]: 40 : if (HeapTupleHasExternal(tuple))
214 : : {
215 : 13 : TupleDesc desc = RelationGetDescr(relation);
216 : : TupleTableSlot *slot;
217 : :
218 : : /* Initialize the slot, if not done already */
219 [ + + ]: 13 : if (dstate->slot == NULL)
220 : : {
221 : : ResourceOwner saveResourceOwner;
222 : :
223 : 1 : MemoryContextSwitchTo(dstate->worker_cxt);
224 : 1 : saveResourceOwner = CurrentResourceOwner;
225 : 1 : CurrentResourceOwner = dstate->worker_resowner;
226 : 1 : dstate->slot = MakeSingleTupleTableSlot(desc, &TTSOpsHeapTuple);
227 : 1 : MemoryContextSwitchTo(dstate->change_cxt);
228 : 1 : CurrentResourceOwner = saveResourceOwner;
229 : : }
230 : :
231 : 13 : slot = dstate->slot;
232 : 13 : ExecStoreHeapTuple(tuple, slot, false);
233 : :
234 : : /*
235 : : * Loop over all attributes, and find out which ones we need to spill
236 : : * separately, to wit: each one that's a non-null varlena and stored
237 : : * out of line.
238 : : */
239 [ + + ]: 78 : for (int i = 0; i < desc->natts; i++)
240 : : {
241 : 65 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
242 : : varlena *varlen;
243 : :
244 [ + + + + : 91 : if (attr->attisdropped || attr->attlen != -1 ||
- + ]
245 : 26 : slot_attisnull(slot, i + 1))
246 : 39 : continue;
247 : :
248 : 26 : slot_getsomeattrs(slot, i + 1);
249 : :
250 : : /*
251 : : * This is a non-null varlena datum, but we only care if it's
252 : : * out-of-line
253 : : */
254 : 26 : varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
255 [ + + ]: 26 : if (!VARATT_IS_EXTERNAL(varlen))
256 : 9 : continue;
257 : :
258 : : /*
259 : : * We spill any indirect-external attributes separately from the
260 : : * heap tuple. Anything else is written as is.
261 : : */
262 [ + + ]: 17 : if (VARATT_IS_EXTERNAL_INDIRECT(varlen))
263 : 15 : attrs_ext = lappend(attrs_ext, varlen);
264 : : else
265 : : {
266 : : /*
267 : : * Logical decoding should not produce "external expanded"
268 : : * attributes (those actually should never appear on disk), so
269 : : * only TOASTed attribute can be seen here.
270 : : *
271 : : * We get here if the table has external values but only
272 : : * in-line values are being updated now.
273 : : */
274 [ - + ]: 2 : Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
275 : : }
276 : : }
277 : :
278 : 13 : ExecClearTuple(slot);
279 : : }
280 : :
281 : : /*
282 : : * First, write the original heap tuple, prefixed by its length. Note
283 : : * that the external-toast tag for each toasted attribute will be present
284 : : * in what we write, so that we know where to restore each one later.
285 : : */
286 : 40 : BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
287 : 40 : BufFileWrite(file, tuple->t_data, tuple->t_len);
288 : :
289 : : /* Then, write the number of external attributes we found. */
290 : 40 : natt_ext = list_length(attrs_ext);
291 : 40 : BufFileWrite(file, &natt_ext, sizeof(natt_ext));
292 : :
293 : : /* Finally, the attributes themselves, if any */
294 [ + + + + : 95 : foreach_ptr(varlena, attr_val, attrs_ext)
+ + ]
295 : : {
296 : 15 : attr_val = detoast_external_attr(attr_val);
297 : 15 : BufFileWrite(file, attr_val, VARSIZE_ANY(attr_val));
298 : : /* These attributes could be large, so free them right away */
299 : 15 : pfree(attr_val);
300 : : }
301 : :
302 : : /* Cleanup. */
303 : 40 : MemoryContextSwitchTo(oldcxt);
304 : 40 : MemoryContextReset(dstate->change_cxt);
305 : 40 : }
|