Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgrepack.c
4 : * Logical Replication output plugin for REPACK command
5 : *
6 : * Copyright (c) 2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgrepack/pgrepack.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/detoast.h"
16 : #include "commands/repack_internal.h"
17 : #include "replication/snapbuild.h"
18 : #include "utils/memutils.h"
19 :
20 6 : PG_MODULE_MAGIC;
21 :
22 : static void repack_startup(LogicalDecodingContext *ctx,
23 : OutputPluginOptions *opt, bool is_init);
24 : static void repack_shutdown(LogicalDecodingContext *ctx);
25 : static void repack_begin_txn(LogicalDecodingContext *ctx,
26 : ReorderBufferTXN *txn);
27 : static void repack_commit_txn(LogicalDecodingContext *ctx,
28 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
29 : static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
30 : Relation relation, ReorderBufferChange *change);
31 : static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
32 : ConcurrentChangeKind kind, HeapTuple tuple);
33 :
34 : void
35 6 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
36 : {
37 6 : cb->startup_cb = repack_startup;
38 6 : cb->begin_cb = repack_begin_txn;
39 6 : cb->change_cb = repack_process_change;
40 6 : cb->commit_cb = repack_commit_txn;
41 6 : cb->shutdown_cb = repack_shutdown;
42 6 : }
43 :
44 :
45 : /* initialize this plugin */
46 : static void
47 6 : repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
48 : bool is_init)
49 : {
50 6 : ctx->output_plugin_private = NULL;
51 :
52 : /* Probably unnecessary, as we don't use the SQL interface ... */
53 6 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
54 :
55 6 : if (ctx->output_plugin_options != NIL)
56 : {
57 0 : ereport(ERROR,
58 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
59 : errmsg("this plugin does not expect any options"));
60 : }
61 6 : }
62 :
63 : static void
64 6 : repack_shutdown(LogicalDecodingContext *ctx)
65 : {
66 6 : }
67 :
68 : /*
69 : * As we don't release the slot during processing of particular table, there's
70 : * no room for SQL interface, even for debugging purposes. Therefore we need
71 : * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
72 : * callbacks. (Although we might want to write custom callbacks, this API
73 : * seems to be unnecessarily generic for our purposes.)
74 : */
75 :
76 : /* BEGIN callback */
77 : static void
78 11 : repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
79 : {
80 11 : }
81 :
82 : /* COMMIT callback */
83 : static void
84 11 : repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
85 : XLogRecPtr commit_lsn)
86 : {
87 11 : }
88 :
89 : /*
90 : * Callback for individual changed tuples
91 : */
92 : static void
93 32 : repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
94 : Relation relation, ReorderBufferChange *change)
95 : {
96 32 : RepackDecodingState *private PG_USED_FOR_ASSERTS_ONLY =
97 : (RepackDecodingState *) ctx->output_writer_private;
98 :
99 : /* Changes of other relation should not have been decoded. */
100 : Assert(RelationGetRelid(relation) == private->relid);
101 :
102 : /* Decode entry depending on its type */
103 32 : switch (change->action)
104 : {
105 7 : case REORDER_BUFFER_CHANGE_INSERT:
106 : {
107 : HeapTuple newtuple;
108 :
109 7 : newtuple = change->data.tp.newtuple;
110 :
111 : /*
112 : * Identity checks in the main function should have made this
113 : * impossible.
114 : */
115 7 : if (newtuple == NULL)
116 0 : elog(ERROR, "incomplete insert info");
117 :
118 7 : repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
119 : }
120 7 : break;
121 22 : case REORDER_BUFFER_CHANGE_UPDATE:
122 : {
123 : HeapTuple oldtuple,
124 : newtuple;
125 :
126 22 : oldtuple = change->data.tp.oldtuple;
127 22 : newtuple = change->data.tp.newtuple;
128 :
129 22 : if (newtuple == NULL)
130 0 : elog(ERROR, "incomplete update info");
131 :
132 22 : if (oldtuple != NULL)
133 8 : repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
134 :
135 22 : repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
136 : }
137 22 : break;
138 3 : case REORDER_BUFFER_CHANGE_DELETE:
139 : {
140 : HeapTuple oldtuple;
141 :
142 3 : oldtuple = change->data.tp.oldtuple;
143 :
144 3 : if (oldtuple == NULL)
145 0 : elog(ERROR, "incomplete delete info");
146 :
147 3 : repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
148 : }
149 3 : break;
150 0 : default:
151 :
152 : /*
153 : * Should not come here. This includes TRUNCATE of the table being
154 : * processed. heap_decode() cannot check the file locator easily,
155 : * but we assume that TRUNCATE uses AccessExclusiveLock on the
156 : * table so it should not occur during REPACK (CONCURRENTLY).
157 : */
158 : Assert(false);
159 0 : break;
160 : }
161 32 : }
162 :
163 : /*
164 : * Write the given tuple, with the given change kind, to the repack spill
165 : * file. Later, the repack decoding worker can read these and replay
166 : * the operations on the new copy of the table.
167 : *
168 : * For each change affecting the table being repacked, we store enough
169 : * information about each tuple in it, so that it can be replayed in the
170 : * new copy of the table.
171 : */
172 : static void
173 40 : repack_store_change(LogicalDecodingContext *ctx, Relation relation,
174 : ConcurrentChangeKind kind, HeapTuple tuple)
175 : {
176 : RepackDecodingState *dstate;
177 : MemoryContext oldcxt;
178 : BufFile *file;
179 40 : List *attrs_ext = NIL;
180 : int natt_ext;
181 :
182 40 : dstate = (RepackDecodingState *) ctx->output_writer_private;
183 40 : file = dstate->file;
184 :
185 : /* Store the change kind. */
186 40 : BufFileWrite(file, &kind, 1);
187 :
188 : /* Use a frequently-reset context to avoid dealing with leaks manually */
189 40 : oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
190 :
191 : /*
192 : * If the tuple contains "external indirect" attributes, we need to write
193 : * the contents to the file because we have no control over that memory.
194 : */
195 40 : if (HeapTupleHasExternal(tuple))
196 : {
197 13 : TupleDesc desc = RelationGetDescr(relation);
198 : TupleTableSlot *slot;
199 :
200 : /* Initialize the slot, if not done already */
201 13 : if (dstate->slot == NULL)
202 : {
203 : ResourceOwner saveResourceOwner;
204 :
205 1 : MemoryContextSwitchTo(dstate->worker_cxt);
206 1 : saveResourceOwner = CurrentResourceOwner;
207 1 : CurrentResourceOwner = dstate->worker_resowner;
208 1 : dstate->slot = MakeSingleTupleTableSlot(desc, &TTSOpsHeapTuple);
209 1 : MemoryContextSwitchTo(dstate->change_cxt);
210 1 : CurrentResourceOwner = saveResourceOwner;
211 : }
212 :
213 13 : slot = dstate->slot;
214 13 : ExecStoreHeapTuple(tuple, slot, false);
215 :
216 : /*
217 : * Loop over all attributes, and find out which ones we need to spill
218 : * separately, to wit: each one that's a non-null varlena and stored
219 : * out of line.
220 : */
221 78 : for (int i = 0; i < desc->natts; i++)
222 : {
223 65 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
224 : varlena *varlen;
225 :
226 91 : if (attr->attisdropped || attr->attlen != -1 ||
227 26 : slot_attisnull(slot, i + 1))
228 39 : continue;
229 :
230 26 : slot_getsomeattrs(slot, i + 1);
231 :
232 : /*
233 : * This is a non-null varlena datum, but we only care if it's
234 : * out-of-line
235 : */
236 26 : varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
237 26 : if (!VARATT_IS_EXTERNAL(varlen))
238 9 : continue;
239 :
240 : /*
241 : * We spill any indirect-external attributes separately from the
242 : * heap tuple. Anything else is written as is.
243 : */
244 17 : if (VARATT_IS_EXTERNAL_INDIRECT(varlen))
245 15 : attrs_ext = lappend(attrs_ext, varlen);
246 : else
247 : {
248 : /*
249 : * Logical decoding should not produce "external expanded"
250 : * attributes (those actually should never appear on disk), so
251 : * only TOASTed attribute can be seen here.
252 : *
253 : * We get here if the table has external values but only
254 : * in-line values are being updated now.
255 : */
256 : Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
257 : }
258 : }
259 :
260 13 : ExecClearTuple(slot);
261 : }
262 :
263 : /*
264 : * First, write the original heap tuple, prefixed by its length. Note
265 : * that the external-toast tag for each toasted attribute will be present
266 : * in what we write, so that we know where to restore each one later.
267 : */
268 40 : BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
269 40 : BufFileWrite(file, tuple->t_data, tuple->t_len);
270 :
271 : /* Then, write the number of external attributes we found. */
272 40 : natt_ext = list_length(attrs_ext);
273 40 : BufFileWrite(file, &natt_ext, sizeof(natt_ext));
274 :
275 : /* Finally, the attributes themselves, if any */
276 95 : foreach_ptr(varlena, attr_val, attrs_ext)
277 : {
278 15 : attr_val = detoast_external_attr(attr_val);
279 15 : BufFileWrite(file, attr_val, VARSIZE_ANY(attr_val));
280 : /* These attributes could be large, so free them right away */
281 15 : pfree(attr_val);
282 : }
283 :
284 : /* Cleanup. */
285 40 : MemoryContextSwitchTo(oldcxt);
286 40 : MemoryContextReset(dstate->change_cxt);
287 40 : }
|