Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/pg_subscription.h"
23 : #include "catalog/pg_subscription_rel.h"
24 : #include "catalog/pg_type.h"
25 : #include "miscadmin.h"
26 : #include "storage/lmgr.h"
27 : #include "utils/array.h"
28 : #include "utils/builtins.h"
29 : #include "utils/fmgroids.h"
30 : #include "utils/lsyscache.h"
31 : #include "utils/pg_lsn.h"
32 : #include "utils/rel.h"
33 : #include "utils/syscache.h"
34 :
35 : static List *textarray_to_stringlist(ArrayType *textarray);
36 :
37 : /*
38 : * Add a comma-separated list of publication names to the 'dest' string.
39 : */
40 : void
41 894 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
42 : {
43 : ListCell *lc;
44 894 : bool first = true;
45 :
46 : Assert(publications != NIL);
47 :
48 2316 : foreach(lc, publications)
49 : {
50 1422 : char *pubname = strVal(lfirst(lc));
51 :
52 1422 : if (first)
53 894 : first = false;
54 : else
55 528 : appendStringInfoString(dest, ", ");
56 :
57 1422 : if (quote_literal)
58 1410 : appendStringInfoString(dest, quote_literal_cstr(pubname));
59 : else
60 : {
61 12 : appendStringInfoChar(dest, '"');
62 12 : appendStringInfoString(dest, pubname);
63 12 : appendStringInfoChar(dest, '"');
64 : }
65 : }
66 894 : }
67 :
68 : /*
69 : * Fetch the subscription from the syscache.
70 : */
71 : Subscription *
72 1374 : GetSubscription(Oid subid, bool missing_ok)
73 : {
74 : HeapTuple tup;
75 : Subscription *sub;
76 : Form_pg_subscription subform;
77 : Datum datum;
78 : bool isnull;
79 :
80 1374 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
81 :
82 1372 : if (!HeapTupleIsValid(tup))
83 : {
84 0 : if (missing_ok)
85 0 : return NULL;
86 :
87 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
88 : }
89 :
90 1372 : subform = (Form_pg_subscription) GETSTRUCT(tup);
91 :
92 1372 : sub = (Subscription *) palloc(sizeof(Subscription));
93 1372 : sub->oid = subid;
94 1372 : sub->dbid = subform->subdbid;
95 1372 : sub->skiplsn = subform->subskiplsn;
96 1372 : sub->name = pstrdup(NameStr(subform->subname));
97 1372 : sub->owner = subform->subowner;
98 1372 : sub->enabled = subform->subenabled;
99 1372 : sub->binary = subform->subbinary;
100 1372 : sub->stream = subform->substream;
101 1372 : sub->twophasestate = subform->subtwophasestate;
102 1372 : sub->disableonerr = subform->subdisableonerr;
103 1372 : sub->passwordrequired = subform->subpasswordrequired;
104 1372 : sub->runasowner = subform->subrunasowner;
105 1372 : sub->failover = subform->subfailover;
106 :
107 : /* Get conninfo */
108 1372 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
109 : tup,
110 : Anum_pg_subscription_subconninfo);
111 1372 : sub->conninfo = TextDatumGetCString(datum);
112 :
113 : /* Get slotname */
114 1372 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
115 : tup,
116 : Anum_pg_subscription_subslotname,
117 : &isnull);
118 1372 : if (!isnull)
119 1306 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
120 : else
121 66 : sub->slotname = NULL;
122 :
123 : /* Get synccommit */
124 1372 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
125 : tup,
126 : Anum_pg_subscription_subsynccommit);
127 1372 : sub->synccommit = TextDatumGetCString(datum);
128 :
129 : /* Get publications */
130 1372 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
131 : tup,
132 : Anum_pg_subscription_subpublications);
133 1372 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
134 :
135 : /* Get origin */
136 1372 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
137 : tup,
138 : Anum_pg_subscription_suborigin);
139 1372 : sub->origin = TextDatumGetCString(datum);
140 :
141 : /* Is the subscription owner a superuser? */
142 1372 : sub->ownersuperuser = superuser_arg(sub->owner);
143 :
144 1372 : ReleaseSysCache(tup);
145 :
146 1372 : return sub;
147 : }
148 :
149 : /*
150 : * Return number of subscriptions defined in given database.
151 : * Used by dropdb() to check if database can indeed be dropped.
152 : */
153 : int
154 68 : CountDBSubscriptions(Oid dbid)
155 : {
156 68 : int nsubs = 0;
157 : Relation rel;
158 : ScanKeyData scankey;
159 : SysScanDesc scan;
160 : HeapTuple tup;
161 :
162 68 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
163 :
164 68 : ScanKeyInit(&scankey,
165 : Anum_pg_subscription_subdbid,
166 : BTEqualStrategyNumber, F_OIDEQ,
167 : ObjectIdGetDatum(dbid));
168 :
169 68 : scan = systable_beginscan(rel, InvalidOid, false,
170 : NULL, 1, &scankey);
171 :
172 68 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
173 0 : nsubs++;
174 :
175 68 : systable_endscan(scan);
176 :
177 68 : table_close(rel, NoLock);
178 :
179 68 : return nsubs;
180 : }
181 :
182 : /*
183 : * Free memory allocated by subscription struct.
184 : */
185 : void
186 66 : FreeSubscription(Subscription *sub)
187 : {
188 66 : pfree(sub->name);
189 66 : pfree(sub->conninfo);
190 66 : if (sub->slotname)
191 66 : pfree(sub->slotname);
192 66 : list_free_deep(sub->publications);
193 66 : pfree(sub);
194 66 : }
195 :
196 : /*
197 : * Disable the given subscription.
198 : */
199 : void
200 8 : DisableSubscription(Oid subid)
201 : {
202 : Relation rel;
203 : bool nulls[Natts_pg_subscription];
204 : bool replaces[Natts_pg_subscription];
205 : Datum values[Natts_pg_subscription];
206 : HeapTuple tup;
207 :
208 : /* Look up the subscription in the catalog */
209 8 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
210 8 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
211 :
212 8 : if (!HeapTupleIsValid(tup))
213 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
214 :
215 8 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
216 :
217 : /* Form a new tuple. */
218 8 : memset(values, 0, sizeof(values));
219 8 : memset(nulls, false, sizeof(nulls));
220 8 : memset(replaces, false, sizeof(replaces));
221 :
222 : /* Set the subscription to disabled. */
223 8 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
224 8 : replaces[Anum_pg_subscription_subenabled - 1] = true;
225 :
226 : /* Update the catalog */
227 8 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
228 : replaces);
229 8 : CatalogTupleUpdate(rel, &tup->t_self, tup);
230 8 : heap_freetuple(tup);
231 :
232 8 : table_close(rel, NoLock);
233 8 : }
234 :
235 : /*
236 : * Convert text array to list of strings.
237 : *
238 : * Note: the resulting list of strings is pallocated here.
239 : */
240 : static List *
241 1372 : textarray_to_stringlist(ArrayType *textarray)
242 : {
243 : Datum *elems;
244 : int nelems,
245 : i;
246 1372 : List *res = NIL;
247 :
248 1372 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
249 :
250 1372 : if (nelems == 0)
251 0 : return NIL;
252 :
253 3404 : for (i = 0; i < nelems; i++)
254 2032 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
255 :
256 1372 : return res;
257 : }
258 :
259 : /*
260 : * Add new state record for a subscription table.
261 : *
262 : * If retain_lock is true, then don't release the locks taken in this function.
263 : * We normally release the locks at the end of transaction but in binary-upgrade
264 : * mode, we expect to release those immediately.
265 : */
266 : void
267 380 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
268 : XLogRecPtr sublsn, bool retain_lock)
269 : {
270 : Relation rel;
271 : HeapTuple tup;
272 : bool nulls[Natts_pg_subscription_rel];
273 : Datum values[Natts_pg_subscription_rel];
274 :
275 380 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
276 :
277 380 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
278 :
279 : /* Try finding existing mapping. */
280 380 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
281 : ObjectIdGetDatum(relid),
282 : ObjectIdGetDatum(subid));
283 380 : if (HeapTupleIsValid(tup))
284 0 : elog(ERROR, "subscription table %u in subscription %u already exists",
285 : relid, subid);
286 :
287 : /* Form the tuple. */
288 380 : memset(values, 0, sizeof(values));
289 380 : memset(nulls, false, sizeof(nulls));
290 380 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
291 380 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
292 380 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
293 380 : if (sublsn != InvalidXLogRecPtr)
294 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
295 : else
296 378 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
297 :
298 380 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
299 :
300 : /* Insert tuple into catalog. */
301 380 : CatalogTupleInsert(rel, tup);
302 :
303 380 : heap_freetuple(tup);
304 :
305 : /* Cleanup. */
306 380 : if (retain_lock)
307 : {
308 376 : table_close(rel, NoLock);
309 : }
310 : else
311 : {
312 4 : table_close(rel, RowExclusiveLock);
313 4 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
314 : }
315 380 : }
316 :
317 : /*
318 : * Update the state of a subscription table.
319 : */
320 : void
321 1404 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
322 : XLogRecPtr sublsn)
323 : {
324 : Relation rel;
325 : HeapTuple tup;
326 : bool nulls[Natts_pg_subscription_rel];
327 : Datum values[Natts_pg_subscription_rel];
328 : bool replaces[Natts_pg_subscription_rel];
329 :
330 1404 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
331 :
332 1402 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
333 :
334 : /* Try finding existing mapping. */
335 1402 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
336 : ObjectIdGetDatum(relid),
337 : ObjectIdGetDatum(subid));
338 1402 : if (!HeapTupleIsValid(tup))
339 0 : elog(ERROR, "subscription table %u in subscription %u does not exist",
340 : relid, subid);
341 :
342 : /* Update the tuple. */
343 1402 : memset(values, 0, sizeof(values));
344 1402 : memset(nulls, false, sizeof(nulls));
345 1402 : memset(replaces, false, sizeof(replaces));
346 :
347 1402 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
348 1402 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
349 :
350 1402 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
351 1402 : if (sublsn != InvalidXLogRecPtr)
352 688 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
353 : else
354 714 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
355 :
356 1402 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
357 : replaces);
358 :
359 : /* Update the catalog. */
360 1402 : CatalogTupleUpdate(rel, &tup->t_self, tup);
361 :
362 : /* Cleanup. */
363 1402 : table_close(rel, NoLock);
364 1402 : }
365 :
366 : /*
367 : * Get state of subscription table.
368 : *
369 : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
370 : */
371 : char
372 2180 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
373 : {
374 : HeapTuple tup;
375 : char substate;
376 : bool isnull;
377 : Datum d;
378 : Relation rel;
379 :
380 : /*
381 : * This is to avoid the race condition with AlterSubscription which tries
382 : * to remove this relstate.
383 : */
384 2180 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
385 :
386 : /* Try finding the mapping. */
387 2180 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
388 : ObjectIdGetDatum(relid),
389 : ObjectIdGetDatum(subid));
390 :
391 2180 : if (!HeapTupleIsValid(tup))
392 : {
393 54 : table_close(rel, AccessShareLock);
394 54 : *sublsn = InvalidXLogRecPtr;
395 54 : return SUBREL_STATE_UNKNOWN;
396 : }
397 :
398 : /* Get the state. */
399 2126 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
400 :
401 : /* Get the LSN */
402 2126 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
403 : Anum_pg_subscription_rel_srsublsn, &isnull);
404 2126 : if (isnull)
405 1154 : *sublsn = InvalidXLogRecPtr;
406 : else
407 972 : *sublsn = DatumGetLSN(d);
408 :
409 : /* Cleanup */
410 2126 : ReleaseSysCache(tup);
411 :
412 2126 : table_close(rel, AccessShareLock);
413 :
414 2126 : return substate;
415 : }
416 :
417 : /*
418 : * Drop subscription relation mapping. These can be for a particular
419 : * subscription, or for a particular relation, or both.
420 : */
421 : void
422 45678 : RemoveSubscriptionRel(Oid subid, Oid relid)
423 : {
424 : Relation rel;
425 : TableScanDesc scan;
426 : ScanKeyData skey[2];
427 : HeapTuple tup;
428 45678 : int nkeys = 0;
429 :
430 45678 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
431 :
432 45678 : if (OidIsValid(subid))
433 : {
434 232 : ScanKeyInit(&skey[nkeys++],
435 : Anum_pg_subscription_rel_srsubid,
436 : BTEqualStrategyNumber,
437 : F_OIDEQ,
438 : ObjectIdGetDatum(subid));
439 : }
440 :
441 45678 : if (OidIsValid(relid))
442 : {
443 45482 : ScanKeyInit(&skey[nkeys++],
444 : Anum_pg_subscription_rel_srrelid,
445 : BTEqualStrategyNumber,
446 : F_OIDEQ,
447 : ObjectIdGetDatum(relid));
448 : }
449 :
450 : /* Do the search and delete what we found. */
451 45678 : scan = table_beginscan_catalog(rel, nkeys, skey);
452 45884 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
453 : {
454 : Form_pg_subscription_rel subrel;
455 :
456 206 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
457 :
458 : /*
459 : * We don't allow to drop the relation mapping when the table
460 : * synchronization is in progress unless the caller updates the
461 : * corresponding subscription as well. This is to ensure that we don't
462 : * leave tablesync slots or origins in the system when the
463 : * corresponding table is dropped.
464 : */
465 206 : if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
466 : {
467 0 : ereport(ERROR,
468 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
469 : errmsg("could not drop relation mapping for subscription \"%s\"",
470 : get_subscription_name(subrel->srsubid, false)),
471 : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
472 : get_rel_name(relid), subrel->srsubstate),
473 :
474 : /*
475 : * translator: first %s is a SQL ALTER command and second %s is a
476 : * SQL DROP command
477 : */
478 : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
479 : "ALTER SUBSCRIPTION ... ENABLE",
480 : "DROP SUBSCRIPTION ...")));
481 : }
482 :
483 206 : CatalogTupleDelete(rel, &tup->t_self);
484 : }
485 45678 : table_endscan(scan);
486 :
487 45678 : table_close(rel, RowExclusiveLock);
488 45678 : }
489 :
490 : /*
491 : * Does the subscription have any relations?
492 : *
493 : * Use this function only to know true/false, and when you have no need for the
494 : * List returned by GetSubscriptionRelations.
495 : */
496 : bool
497 420 : HasSubscriptionRelations(Oid subid)
498 : {
499 : Relation rel;
500 : ScanKeyData skey[1];
501 : SysScanDesc scan;
502 : bool has_subrels;
503 :
504 420 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
505 :
506 420 : ScanKeyInit(&skey[0],
507 : Anum_pg_subscription_rel_srsubid,
508 : BTEqualStrategyNumber, F_OIDEQ,
509 : ObjectIdGetDatum(subid));
510 :
511 420 : scan = systable_beginscan(rel, InvalidOid, false,
512 : NULL, 1, skey);
513 :
514 : /* If even a single tuple exists then the subscription has tables. */
515 420 : has_subrels = HeapTupleIsValid(systable_getnext(scan));
516 :
517 : /* Cleanup */
518 420 : systable_endscan(scan);
519 420 : table_close(rel, AccessShareLock);
520 :
521 420 : return has_subrels;
522 : }
523 :
524 : /*
525 : * Get the relations for the subscription.
526 : *
527 : * If not_ready is true, return only the relations that are not in a ready
528 : * state, otherwise return all the relations of the subscription. The
529 : * returned list is palloc'ed in the current memory context.
530 : */
531 : List *
532 1840 : GetSubscriptionRelations(Oid subid, bool not_ready)
533 : {
534 1840 : List *res = NIL;
535 : Relation rel;
536 : HeapTuple tup;
537 1840 : int nkeys = 0;
538 : ScanKeyData skey[2];
539 : SysScanDesc scan;
540 :
541 1840 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
542 :
543 1840 : ScanKeyInit(&skey[nkeys++],
544 : Anum_pg_subscription_rel_srsubid,
545 : BTEqualStrategyNumber, F_OIDEQ,
546 : ObjectIdGetDatum(subid));
547 :
548 1840 : if (not_ready)
549 1784 : ScanKeyInit(&skey[nkeys++],
550 : Anum_pg_subscription_rel_srsubstate,
551 : BTEqualStrategyNumber, F_CHARNE,
552 : CharGetDatum(SUBREL_STATE_READY));
553 :
554 1840 : scan = systable_beginscan(rel, InvalidOid, false,
555 : NULL, nkeys, skey);
556 :
557 4560 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
558 : {
559 : Form_pg_subscription_rel subrel;
560 : SubscriptionRelState *relstate;
561 : Datum d;
562 : bool isnull;
563 :
564 2720 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
565 :
566 2720 : relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
567 2720 : relstate->relid = subrel->srrelid;
568 2720 : relstate->state = subrel->srsubstate;
569 2720 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
570 : Anum_pg_subscription_rel_srsublsn, &isnull);
571 2720 : if (isnull)
572 2214 : relstate->lsn = InvalidXLogRecPtr;
573 : else
574 506 : relstate->lsn = DatumGetLSN(d);
575 :
576 2720 : res = lappend(res, relstate);
577 : }
578 :
579 : /* Cleanup */
580 1840 : systable_endscan(scan);
581 1840 : table_close(rel, AccessShareLock);
582 :
583 1840 : return res;
584 : }
|