Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgrepack.c
4 : * Logical Replication output plugin for REPACK command
5 : *
6 : * Copyright (c) 2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgrepack/pgrepack.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/detoast.h"
16 : #include "commands/repack_internal.h"
17 : #include "replication/snapbuild.h"
18 : #include "utils/memutils.h"
19 :
20 2 : PG_MODULE_MAGIC;
21 :
22 : static void repack_startup(LogicalDecodingContext *ctx,
23 : OutputPluginOptions *opt, bool is_init);
24 : static void repack_shutdown(LogicalDecodingContext *ctx);
25 : static void repack_begin_txn(LogicalDecodingContext *ctx,
26 : ReorderBufferTXN *txn);
27 : static void repack_commit_txn(LogicalDecodingContext *ctx,
28 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
29 : static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
30 : Relation rel, ReorderBufferChange *change);
31 : static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
32 : ConcurrentChangeKind kind, HeapTuple tuple);
33 :
34 : void
35 2 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
36 : {
37 2 : cb->startup_cb = repack_startup;
38 2 : cb->begin_cb = repack_begin_txn;
39 2 : cb->change_cb = repack_process_change;
40 2 : cb->commit_cb = repack_commit_txn;
41 2 : cb->shutdown_cb = repack_shutdown;
42 2 : }
43 :
44 :
45 : /* initialize this plugin */
46 : static void
47 2 : repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
48 : bool is_init)
49 : {
50 2 : ctx->output_plugin_private = NULL;
51 :
52 : /* Probably unnecessary, as we don't use the SQL interface ... */
53 2 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
54 :
55 : /*
56 : * REPACK doesn't need access to shared catalogs, so we can speed up the
57 : * historic snapshot creation by setting this flag. We'll only have to
58 : * wait for transactions in our database.
59 : */
60 2 : opt->need_shared_catalogs = false;
61 :
62 2 : if (ctx->output_plugin_options != NIL)
63 : {
64 0 : ereport(ERROR,
65 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 : errmsg("this plugin does not expect any options"));
67 : }
68 2 : }
69 :
70 : static void
71 2 : repack_shutdown(LogicalDecodingContext *ctx)
72 : {
73 2 : }
74 :
75 : /*
76 : * As we don't release the slot during processing of particular table, there's
77 : * no room for SQL interface, even for debugging purposes. Therefore we need
78 : * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
79 : * callbacks. (Although we might want to write custom callbacks, this API
80 : * seems to be unnecessarily generic for our purposes.)
81 : */
82 :
83 : /* BEGIN callback */
84 : static void
85 7 : repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
86 : {
87 7 : }
88 :
89 : /* COMMIT callback */
90 : static void
91 7 : repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
92 : XLogRecPtr commit_lsn)
93 : {
94 7 : }
95 :
96 : /*
97 : * Callback for individual changed tuples
98 : */
99 : static void
100 20 : repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
101 : Relation relation, ReorderBufferChange *change)
102 : {
103 20 : RepackDecodingState *private PG_USED_FOR_ASSERTS_ONLY =
104 : (RepackDecodingState *) ctx->output_writer_private;
105 :
106 : /* Changes of other relation should not have been decoded. */
107 : Assert(RelationGetRelid(relation) == private->relid);
108 :
109 : /* Decode entry depending on its type */
110 20 : switch (change->action)
111 : {
112 7 : case REORDER_BUFFER_CHANGE_INSERT:
113 : {
114 : HeapTuple newtuple;
115 :
116 7 : newtuple = change->data.tp.newtuple;
117 :
118 : /*
119 : * Identity checks in the main function should have made this
120 : * impossible.
121 : */
122 7 : if (newtuple == NULL)
123 0 : elog(ERROR, "incomplete insert info");
124 :
125 7 : repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
126 : }
127 7 : break;
128 10 : case REORDER_BUFFER_CHANGE_UPDATE:
129 : {
130 : HeapTuple oldtuple,
131 : newtuple;
132 :
133 10 : oldtuple = change->data.tp.oldtuple;
134 10 : newtuple = change->data.tp.newtuple;
135 :
136 10 : if (newtuple == NULL)
137 0 : elog(ERROR, "incomplete update info");
138 :
139 10 : if (oldtuple != NULL)
140 7 : repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
141 :
142 10 : repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
143 : }
144 10 : break;
145 3 : case REORDER_BUFFER_CHANGE_DELETE:
146 : {
147 : HeapTuple oldtuple;
148 :
149 3 : oldtuple = change->data.tp.oldtuple;
150 :
151 3 : if (oldtuple == NULL)
152 0 : elog(ERROR, "incomplete delete info");
153 :
154 3 : repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
155 : }
156 3 : break;
157 0 : default:
158 :
159 : /*
160 : * Should not come here. This includes TRUNCATE of the table being
161 : * processed. heap_decode() cannot check the file locator easily,
162 : * but we assume that TRUNCATE uses AccessExclusiveLock on the
163 : * table so it should not occur during REPACK (CONCURRENTLY).
164 : */
165 : Assert(false);
166 0 : break;
167 : }
168 20 : }
169 :
170 : /*
171 : * Write the given tuple, with the given change kind, to the repack spill
172 : * file. Later, the repack decoding worker can read these and replay
173 : * the operations on the new copy of the table.
174 : *
175 : * For each change affecting the table being repacked, we store enough
176 : * information about each tuple in it, so that it can be replayed in the
177 : * new copy of the table.
178 : */
179 : static void
180 27 : repack_store_change(LogicalDecodingContext *ctx, Relation relation,
181 : ConcurrentChangeKind kind, HeapTuple tuple)
182 : {
183 : RepackDecodingState *dstate;
184 : MemoryContext oldcxt;
185 : BufFile *file;
186 27 : List *attrs_ext = NIL;
187 : int natt_ext;
188 :
189 27 : dstate = (RepackDecodingState *) ctx->output_writer_private;
190 27 : file = dstate->file;
191 :
192 : /* Store the change kind. */
193 27 : BufFileWrite(file, &kind, 1);
194 :
195 : /* Use a frequently-reset context to avoid dealing with leaks manually */
196 27 : oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
197 :
198 : /*
199 : * If the tuple contains "external indirect" attributes, we need to write
200 : * the contents to the file because we have no control over that memory.
201 : */
202 27 : if (HeapTupleHasExternal(tuple))
203 : {
204 3 : TupleDesc desc = RelationGetDescr(relation);
205 : TupleTableSlot *slot;
206 :
207 : /* Initialize the slot, if not done already */
208 3 : if (dstate->slot == NULL)
209 : {
210 : ResourceOwner saveResourceOwner;
211 :
212 1 : MemoryContextSwitchTo(dstate->worker_cxt);
213 1 : saveResourceOwner = CurrentResourceOwner;
214 1 : CurrentResourceOwner = dstate->worker_resowner;
215 1 : dstate->slot = MakeSingleTupleTableSlot(desc, &TTSOpsHeapTuple);
216 1 : MemoryContextSwitchTo(dstate->change_cxt);
217 1 : CurrentResourceOwner = saveResourceOwner;
218 : }
219 :
220 3 : slot = dstate->slot;
221 3 : ExecStoreHeapTuple(tuple, slot, false);
222 :
223 : /*
224 : * Loop over all attributes, and find out which ones we need to spill
225 : * separately, to wit: each one that's a non-null varlena and stored
226 : * out of line.
227 : */
228 9 : for (int i = 0; i < desc->natts; i++)
229 : {
230 6 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
231 : varlena *varlen;
232 :
233 9 : if (attr->attisdropped || attr->attlen != -1 ||
234 3 : slot_attisnull(slot, i + 1))
235 3 : continue;
236 :
237 3 : slot_getsomeattrs(slot, i + 1);
238 :
239 : /*
240 : * This is a non-null varlena datum, but we only care if it's
241 : * out-of-line
242 : */
243 3 : varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
244 3 : if (!VARATT_IS_EXTERNAL(varlen))
245 0 : continue;
246 :
247 : /*
248 : * We spill any indirect-external attributes separately from the
249 : * heap tuple. Anything else is written as is.
250 : */
251 3 : if (VARATT_IS_EXTERNAL_INDIRECT(varlen))
252 2 : attrs_ext = lappend(attrs_ext, varlen);
253 : else
254 : {
255 : /*
256 : * Logical decoding should not produce "external expanded"
257 : * attributes (those actually should never appear on disk), so
258 : * only TOASTed attribute can be seen here.
259 : *
260 : * We get here if the table has external values but only
261 : * in-line values are being updated now.
262 : */
263 : Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
264 : }
265 : }
266 :
267 3 : ExecClearTuple(slot);
268 : }
269 :
270 : /*
271 : * First, write the original heap tuple, prefixed by its length. Note
272 : * that the external-toast tag for each toasted attribute will be present
273 : * in what we write, so that we know where to restore each one later.
274 : */
275 27 : BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
276 27 : BufFileWrite(file, tuple->t_data, tuple->t_len);
277 :
278 : /* Then, write the number of external attributes we found. */
279 27 : natt_ext = list_length(attrs_ext);
280 27 : BufFileWrite(file, &natt_ext, sizeof(natt_ext));
281 :
282 : /* Finally, the attributes themselves, if any */
283 56 : foreach_ptr(varlena, attr_val, attrs_ext)
284 : {
285 2 : attr_val = detoast_external_attr(attr_val);
286 2 : BufFileWrite(file, attr_val, VARSIZE_ANY(attr_val));
287 : /* These attributes could be large, so free them right away */
288 2 : pfree(attr_val);
289 : }
290 :
291 : /* Cleanup. */
292 27 : MemoryContextSwitchTo(oldcxt);
293 27 : MemoryContextReset(dstate->change_cxt);
294 27 : }
|