Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgrepack.c
4 : * Logical Replication output plugin for REPACK command
5 : *
6 : * Copyright (c) 2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgrepack/pgrepack.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/detoast.h"
16 : #include "commands/repack.h"
17 : #include "commands/repack_internal.h"
18 : #include "replication/snapbuild.h"
19 : #include "utils/memutils.h"
20 :
21 8 : PG_MODULE_MAGIC;
22 :
23 : static void repack_startup(LogicalDecodingContext *ctx,
24 : OutputPluginOptions *opt, bool is_init);
25 : static void repack_shutdown(LogicalDecodingContext *ctx);
26 : static void repack_begin_txn(LogicalDecodingContext *ctx,
27 : ReorderBufferTXN *txn);
28 : static void repack_commit_txn(LogicalDecodingContext *ctx,
29 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
30 : static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
31 : Relation relation, ReorderBufferChange *change);
32 : static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
33 : ConcurrentChangeKind kind, HeapTuple tuple);
34 :
35 : void
36 8 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
37 : {
38 8 : cb->startup_cb = repack_startup;
39 8 : cb->begin_cb = repack_begin_txn;
40 8 : cb->change_cb = repack_process_change;
41 8 : cb->commit_cb = repack_commit_txn;
42 8 : cb->shutdown_cb = repack_shutdown;
43 8 : }
44 :
45 :
46 : /* initialize this plugin */
47 : static void
48 8 : repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
49 : bool is_init)
50 : {
51 : RepackDecodingState *dstate;
52 :
53 8 : if (!AmRepackWorker())
54 1 : ereport(ERROR,
55 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
56 : errmsg("unsupported use of logical decoding plugin \"%s\"",
57 : "pgrepack"),
58 : errdetail("This plugin can only be used by %s.",
59 : "REPACK (CONCURRENTLY)"));
60 :
61 : /* Initial setup of our private state */
62 : Assert(CurrentMemoryContext == ctx->context);
63 7 : dstate = palloc0_object(RepackDecodingState);
64 7 : dstate->change_cxt = AllocSetContextCreate(ctx->context,
65 : "REPACK - change",
66 : ALLOCSET_DEFAULT_SIZES);
67 : /* repack_setup_logical_decoding fills in the rest */
68 7 : ctx->output_writer_private = dstate;
69 :
70 : /* Probably unnecessary, as we don't use the SQL interface ... */
71 7 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
72 :
73 7 : if (ctx->output_plugin_options != NIL)
74 : {
75 0 : ereport(ERROR,
76 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
77 : errmsg("this plugin does not expect any options"));
78 : }
79 7 : }
80 :
81 : static void
82 7 : repack_shutdown(LogicalDecodingContext *ctx)
83 : {
84 7 : }
85 :
86 : /*
87 : * As we don't release the slot during processing of particular table, there's
88 : * no room for SQL interface, even for debugging purposes. Therefore we need
89 : * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
90 : * callbacks. (Although we might want to write custom callbacks, this API
91 : * seems to be unnecessarily generic for our purposes.)
92 : */
93 :
94 : /* BEGIN callback */
95 : static void
96 11 : repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
97 : {
98 11 : }
99 :
100 : /* COMMIT callback */
101 : static void
102 11 : repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
103 : XLogRecPtr commit_lsn)
104 : {
105 11 : }
106 :
107 : /*
108 : * Callback for individual changed tuples
109 : */
110 : static void
111 32 : repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
112 : Relation relation, ReorderBufferChange *change)
113 : {
114 32 : RepackDecodingState *private PG_USED_FOR_ASSERTS_ONLY =
115 : (RepackDecodingState *) ctx->output_writer_private;
116 :
117 : /* Changes of other relation should not have been decoded. */
118 : Assert(RelationGetRelid(relation) == private->relid);
119 :
120 : /* Decode entry depending on its type */
121 32 : switch (change->action)
122 : {
123 7 : case REORDER_BUFFER_CHANGE_INSERT:
124 : {
125 : HeapTuple newtuple;
126 :
127 7 : newtuple = change->data.tp.newtuple;
128 :
129 : /*
130 : * Identity checks in the main function should have made this
131 : * impossible.
132 : */
133 7 : if (newtuple == NULL)
134 0 : elog(ERROR, "incomplete insert info");
135 :
136 7 : repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
137 : }
138 7 : break;
139 22 : case REORDER_BUFFER_CHANGE_UPDATE:
140 : {
141 : HeapTuple oldtuple,
142 : newtuple;
143 :
144 22 : oldtuple = change->data.tp.oldtuple;
145 22 : newtuple = change->data.tp.newtuple;
146 :
147 22 : if (newtuple == NULL)
148 0 : elog(ERROR, "incomplete update info");
149 :
150 22 : if (oldtuple != NULL)
151 8 : repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
152 :
153 22 : repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
154 : }
155 22 : break;
156 3 : case REORDER_BUFFER_CHANGE_DELETE:
157 : {
158 : HeapTuple oldtuple;
159 :
160 3 : oldtuple = change->data.tp.oldtuple;
161 :
162 3 : if (oldtuple == NULL)
163 0 : elog(ERROR, "incomplete delete info");
164 :
165 3 : repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
166 : }
167 3 : break;
168 0 : default:
169 :
170 : /*
171 : * Should not come here. This includes TRUNCATE of the table being
172 : * processed. heap_decode() cannot check the file locator easily,
173 : * but we assume that TRUNCATE uses AccessExclusiveLock on the
174 : * table so it should not occur during REPACK (CONCURRENTLY).
175 : */
176 : Assert(false);
177 0 : break;
178 : }
179 32 : }
180 :
181 : /*
182 : * Write the given tuple, with the given change kind, to the repack spill
183 : * file. Later, the repack decoding worker can read these and replay
184 : * the operations on the new copy of the table.
185 : *
186 : * For each change affecting the table being repacked, we store enough
187 : * information about each tuple in it, so that it can be replayed in the
188 : * new copy of the table.
189 : */
190 : static void
191 40 : repack_store_change(LogicalDecodingContext *ctx, Relation relation,
192 : ConcurrentChangeKind kind, HeapTuple tuple)
193 : {
194 : RepackDecodingState *dstate;
195 : MemoryContext oldcxt;
196 : BufFile *file;
197 40 : List *attrs_ext = NIL;
198 : int natt_ext;
199 :
200 40 : dstate = (RepackDecodingState *) ctx->output_writer_private;
201 40 : file = dstate->file;
202 :
203 : /* Store the change kind. */
204 40 : BufFileWrite(file, &kind, 1);
205 :
206 : /* Use a frequently-reset context to avoid dealing with leaks manually */
207 40 : oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
208 :
209 : /*
210 : * If the tuple contains "external indirect" attributes, we need to write
211 : * the contents to the file because we have no control over that memory.
212 : */
213 40 : if (HeapTupleHasExternal(tuple))
214 : {
215 13 : TupleDesc desc = RelationGetDescr(relation);
216 : TupleTableSlot *slot;
217 :
218 : /* Initialize the slot, if not done already */
219 13 : if (dstate->slot == NULL)
220 : {
221 : ResourceOwner saveResourceOwner;
222 :
223 1 : MemoryContextSwitchTo(dstate->worker_cxt);
224 1 : saveResourceOwner = CurrentResourceOwner;
225 1 : CurrentResourceOwner = dstate->worker_resowner;
226 1 : dstate->slot = MakeSingleTupleTableSlot(desc, &TTSOpsHeapTuple);
227 1 : MemoryContextSwitchTo(dstate->change_cxt);
228 1 : CurrentResourceOwner = saveResourceOwner;
229 : }
230 :
231 13 : slot = dstate->slot;
232 13 : ExecStoreHeapTuple(tuple, slot, false);
233 :
234 : /*
235 : * Loop over all attributes, and find out which ones we need to spill
236 : * separately, to wit: each one that's a non-null varlena and stored
237 : * out of line.
238 : */
239 78 : for (int i = 0; i < desc->natts; i++)
240 : {
241 65 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
242 : varlena *varlen;
243 :
244 91 : if (attr->attisdropped || attr->attlen != -1 ||
245 26 : slot_attisnull(slot, i + 1))
246 39 : continue;
247 :
248 26 : slot_getsomeattrs(slot, i + 1);
249 :
250 : /*
251 : * This is a non-null varlena datum, but we only care if it's
252 : * out-of-line
253 : */
254 26 : varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
255 26 : if (!VARATT_IS_EXTERNAL(varlen))
256 9 : continue;
257 :
258 : /*
259 : * We spill any indirect-external attributes separately from the
260 : * heap tuple. Anything else is written as is.
261 : */
262 17 : if (VARATT_IS_EXTERNAL_INDIRECT(varlen))
263 15 : attrs_ext = lappend(attrs_ext, varlen);
264 : else
265 : {
266 : /*
267 : * Logical decoding should not produce "external expanded"
268 : * attributes (those actually should never appear on disk), so
269 : * only TOASTed attribute can be seen here.
270 : *
271 : * We get here if the table has external values but only
272 : * in-line values are being updated now.
273 : */
274 : Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
275 : }
276 : }
277 :
278 13 : ExecClearTuple(slot);
279 : }
280 :
281 : /*
282 : * First, write the original heap tuple, prefixed by its length. Note
283 : * that the external-toast tag for each toasted attribute will be present
284 : * in what we write, so that we know where to restore each one later.
285 : */
286 40 : BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
287 40 : BufFileWrite(file, tuple->t_data, tuple->t_len);
288 :
289 : /* Then, write the number of external attributes we found. */
290 40 : natt_ext = list_length(attrs_ext);
291 40 : BufFileWrite(file, &natt_ext, sizeof(natt_ext));
292 :
293 : /* Finally, the attributes themselves, if any */
294 95 : foreach_ptr(varlena, attr_val, attrs_ext)
295 : {
296 15 : attr_val = detoast_external_attr(attr_val);
297 15 : BufFileWrite(file, attr_val, VARSIZE_ANY(attr_val));
298 : /* These attributes could be large, so free them right away */
299 15 : pfree(attr_val);
300 : }
301 :
302 : /* Cleanup. */
303 40 : MemoryContextSwitchTo(oldcxt);
304 40 : MemoryContextReset(dstate->change_cxt);
305 40 : }
|