Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/pg_subscription.h"
23 : #include "catalog/pg_subscription_rel.h"
24 : #include "catalog/pg_type.h"
25 : #include "miscadmin.h"
26 : #include "storage/lmgr.h"
27 : #include "utils/array.h"
28 : #include "utils/builtins.h"
29 : #include "utils/fmgroids.h"
30 : #include "utils/lsyscache.h"
31 : #include "utils/pg_lsn.h"
32 : #include "utils/rel.h"
33 : #include "utils/syscache.h"
34 :
35 : static List *textarray_to_stringlist(ArrayType *textarray);
36 :
37 : /*
38 : * Add a comma-separated list of publication names to the 'dest' string.
39 : */
40 : void
41 984 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
42 : {
43 : ListCell *lc;
44 984 : bool first = true;
45 :
46 : Assert(publications != NIL);
47 :
48 2554 : foreach(lc, publications)
49 : {
50 1570 : char *pubname = strVal(lfirst(lc));
51 :
52 1570 : if (first)
53 984 : first = false;
54 : else
55 586 : appendStringInfoString(dest, ", ");
56 :
57 1570 : if (quote_literal)
58 1550 : appendStringInfoString(dest, quote_literal_cstr(pubname));
59 : else
60 : {
61 20 : appendStringInfoChar(dest, '"');
62 20 : appendStringInfoString(dest, pubname);
63 20 : appendStringInfoChar(dest, '"');
64 : }
65 : }
66 984 : }
67 :
68 : /*
69 : * Fetch the subscription from the syscache.
70 : */
71 : Subscription *
72 1542 : GetSubscription(Oid subid, bool missing_ok)
73 : {
74 : HeapTuple tup;
75 : Subscription *sub;
76 : Form_pg_subscription subform;
77 : Datum datum;
78 : bool isnull;
79 :
80 1542 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
81 :
82 1540 : if (!HeapTupleIsValid(tup))
83 : {
84 0 : if (missing_ok)
85 0 : return NULL;
86 :
87 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
88 : }
89 :
90 1540 : subform = (Form_pg_subscription) GETSTRUCT(tup);
91 :
92 1540 : sub = (Subscription *) palloc(sizeof(Subscription));
93 1540 : sub->oid = subid;
94 1540 : sub->dbid = subform->subdbid;
95 1540 : sub->skiplsn = subform->subskiplsn;
96 1540 : sub->name = pstrdup(NameStr(subform->subname));
97 1540 : sub->owner = subform->subowner;
98 1540 : sub->enabled = subform->subenabled;
99 1540 : sub->binary = subform->subbinary;
100 1540 : sub->stream = subform->substream;
101 1540 : sub->twophasestate = subform->subtwophasestate;
102 1540 : sub->disableonerr = subform->subdisableonerr;
103 1540 : sub->passwordrequired = subform->subpasswordrequired;
104 1540 : sub->runasowner = subform->subrunasowner;
105 1540 : sub->failover = subform->subfailover;
106 1540 : sub->retaindeadtuples = subform->subretaindeadtuples;
107 :
108 : /* Get conninfo */
109 1540 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
110 : tup,
111 : Anum_pg_subscription_subconninfo);
112 1540 : sub->conninfo = TextDatumGetCString(datum);
113 :
114 : /* Get slotname */
115 1540 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
116 : tup,
117 : Anum_pg_subscription_subslotname,
118 : &isnull);
119 1540 : if (!isnull)
120 1474 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
121 : else
122 66 : sub->slotname = NULL;
123 :
124 : /* Get synccommit */
125 1540 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
126 : tup,
127 : Anum_pg_subscription_subsynccommit);
128 1540 : sub->synccommit = TextDatumGetCString(datum);
129 :
130 : /* Get publications */
131 1540 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
132 : tup,
133 : Anum_pg_subscription_subpublications);
134 1540 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
135 :
136 : /* Get origin */
137 1540 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
138 : tup,
139 : Anum_pg_subscription_suborigin);
140 1540 : sub->origin = TextDatumGetCString(datum);
141 :
142 : /* Is the subscription owner a superuser? */
143 1540 : sub->ownersuperuser = superuser_arg(sub->owner);
144 :
145 1540 : ReleaseSysCache(tup);
146 :
147 1540 : return sub;
148 : }
149 :
150 : /*
151 : * Return number of subscriptions defined in given database.
152 : * Used by dropdb() to check if database can indeed be dropped.
153 : */
154 : int
155 88 : CountDBSubscriptions(Oid dbid)
156 : {
157 88 : int nsubs = 0;
158 : Relation rel;
159 : ScanKeyData scankey;
160 : SysScanDesc scan;
161 : HeapTuple tup;
162 :
163 88 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
164 :
165 88 : ScanKeyInit(&scankey,
166 : Anum_pg_subscription_subdbid,
167 : BTEqualStrategyNumber, F_OIDEQ,
168 : ObjectIdGetDatum(dbid));
169 :
170 88 : scan = systable_beginscan(rel, InvalidOid, false,
171 : NULL, 1, &scankey);
172 :
173 88 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
174 0 : nsubs++;
175 :
176 88 : systable_endscan(scan);
177 :
178 88 : table_close(rel, NoLock);
179 :
180 88 : return nsubs;
181 : }
182 :
183 : /*
184 : * Free memory allocated by subscription struct.
185 : */
186 : void
187 76 : FreeSubscription(Subscription *sub)
188 : {
189 76 : pfree(sub->name);
190 76 : pfree(sub->conninfo);
191 76 : if (sub->slotname)
192 76 : pfree(sub->slotname);
193 76 : list_free_deep(sub->publications);
194 76 : pfree(sub);
195 76 : }
196 :
197 : /*
198 : * Disable the given subscription.
199 : */
200 : void
201 8 : DisableSubscription(Oid subid)
202 : {
203 : Relation rel;
204 : bool nulls[Natts_pg_subscription];
205 : bool replaces[Natts_pg_subscription];
206 : Datum values[Natts_pg_subscription];
207 : HeapTuple tup;
208 :
209 : /* Look up the subscription in the catalog */
210 8 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
211 8 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
212 :
213 8 : if (!HeapTupleIsValid(tup))
214 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
215 :
216 8 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
217 :
218 : /* Form a new tuple. */
219 8 : memset(values, 0, sizeof(values));
220 8 : memset(nulls, false, sizeof(nulls));
221 8 : memset(replaces, false, sizeof(replaces));
222 :
223 : /* Set the subscription to disabled. */
224 8 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
225 8 : replaces[Anum_pg_subscription_subenabled - 1] = true;
226 :
227 : /* Update the catalog */
228 8 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
229 : replaces);
230 8 : CatalogTupleUpdate(rel, &tup->t_self, tup);
231 8 : heap_freetuple(tup);
232 :
233 8 : table_close(rel, NoLock);
234 8 : }
235 :
236 : /*
237 : * Convert text array to list of strings.
238 : *
239 : * Note: the resulting list of strings is pallocated here.
240 : */
241 : static List *
242 1540 : textarray_to_stringlist(ArrayType *textarray)
243 : {
244 : Datum *elems;
245 : int nelems,
246 : i;
247 1540 : List *res = NIL;
248 :
249 1540 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
250 :
251 1540 : if (nelems == 0)
252 0 : return NIL;
253 :
254 3800 : for (i = 0; i < nelems; i++)
255 2260 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
256 :
257 1540 : return res;
258 : }
259 :
260 : /*
261 : * Add new state record for a subscription table.
262 : *
263 : * If retain_lock is true, then don't release the locks taken in this function.
264 : * We normally release the locks at the end of transaction but in binary-upgrade
265 : * mode, we expect to release those immediately.
266 : */
267 : void
268 406 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
269 : XLogRecPtr sublsn, bool retain_lock)
270 : {
271 : Relation rel;
272 : HeapTuple tup;
273 : bool nulls[Natts_pg_subscription_rel];
274 : Datum values[Natts_pg_subscription_rel];
275 :
276 406 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
277 :
278 406 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
279 :
280 : /* Try finding existing mapping. */
281 406 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
282 : ObjectIdGetDatum(relid),
283 : ObjectIdGetDatum(subid));
284 406 : if (HeapTupleIsValid(tup))
285 0 : elog(ERROR, "subscription table %u in subscription %u already exists",
286 : relid, subid);
287 :
288 : /* Form the tuple. */
289 406 : memset(values, 0, sizeof(values));
290 406 : memset(nulls, false, sizeof(nulls));
291 406 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
292 406 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
293 406 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
294 406 : if (sublsn != InvalidXLogRecPtr)
295 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
296 : else
297 404 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
298 :
299 406 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
300 :
301 : /* Insert tuple into catalog. */
302 406 : CatalogTupleInsert(rel, tup);
303 :
304 406 : heap_freetuple(tup);
305 :
306 : /* Cleanup. */
307 406 : if (retain_lock)
308 : {
309 402 : table_close(rel, NoLock);
310 : }
311 : else
312 : {
313 4 : table_close(rel, RowExclusiveLock);
314 4 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
315 : }
316 406 : }
317 :
318 : /*
319 : * Update the state of a subscription table.
320 : */
321 : void
322 1472 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
323 : XLogRecPtr sublsn, bool already_locked)
324 : {
325 : Relation rel;
326 : HeapTuple tup;
327 : bool nulls[Natts_pg_subscription_rel];
328 : Datum values[Natts_pg_subscription_rel];
329 : bool replaces[Natts_pg_subscription_rel];
330 :
331 1472 : if (already_locked)
332 : {
333 : #ifdef USE_ASSERT_CHECKING
334 : LOCKTAG tag;
335 :
336 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
337 : RowExclusiveLock, true));
338 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
339 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
340 : #endif
341 :
342 356 : rel = table_open(SubscriptionRelRelationId, NoLock);
343 : }
344 : else
345 : {
346 1116 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
347 1116 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
348 : }
349 :
350 : /* Try finding existing mapping. */
351 1472 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
352 : ObjectIdGetDatum(relid),
353 : ObjectIdGetDatum(subid));
354 1472 : if (!HeapTupleIsValid(tup))
355 0 : elog(ERROR, "subscription table %u in subscription %u does not exist",
356 : relid, subid);
357 :
358 : /* Update the tuple. */
359 1472 : memset(values, 0, sizeof(values));
360 1472 : memset(nulls, false, sizeof(nulls));
361 1472 : memset(replaces, false, sizeof(replaces));
362 :
363 1472 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
364 1472 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
365 :
366 1472 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
367 1472 : if (sublsn != InvalidXLogRecPtr)
368 720 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
369 : else
370 752 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
371 :
372 1472 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
373 : replaces);
374 :
375 : /* Update the catalog. */
376 1472 : CatalogTupleUpdate(rel, &tup->t_self, tup);
377 :
378 : /* Cleanup. */
379 1472 : table_close(rel, NoLock);
380 1472 : }
381 :
382 : /*
383 : * Get state of subscription table.
384 : *
385 : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
386 : */
387 : char
388 8388 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
389 : {
390 : HeapTuple tup;
391 : char substate;
392 : bool isnull;
393 : Datum d;
394 : Relation rel;
395 :
396 : /*
397 : * This is to avoid the race condition with AlterSubscription which tries
398 : * to remove this relstate.
399 : */
400 8388 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
401 :
402 : /* Try finding the mapping. */
403 8388 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
404 : ObjectIdGetDatum(relid),
405 : ObjectIdGetDatum(subid));
406 :
407 8388 : if (!HeapTupleIsValid(tup))
408 : {
409 52 : table_close(rel, AccessShareLock);
410 52 : *sublsn = InvalidXLogRecPtr;
411 52 : return SUBREL_STATE_UNKNOWN;
412 : }
413 :
414 : /* Get the state. */
415 8336 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
416 :
417 : /* Get the LSN */
418 8336 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
419 : Anum_pg_subscription_rel_srsublsn, &isnull);
420 8336 : if (isnull)
421 7234 : *sublsn = InvalidXLogRecPtr;
422 : else
423 1102 : *sublsn = DatumGetLSN(d);
424 :
425 : /* Cleanup */
426 8336 : ReleaseSysCache(tup);
427 :
428 8336 : table_close(rel, AccessShareLock);
429 :
430 8336 : return substate;
431 : }
432 :
433 : /*
434 : * Drop subscription relation mapping. These can be for a particular
435 : * subscription, or for a particular relation, or both.
436 : */
437 : void
438 48176 : RemoveSubscriptionRel(Oid subid, Oid relid)
439 : {
440 : Relation rel;
441 : TableScanDesc scan;
442 : ScanKeyData skey[2];
443 : HeapTuple tup;
444 48176 : int nkeys = 0;
445 :
446 48176 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
447 :
448 48176 : if (OidIsValid(subid))
449 : {
450 260 : ScanKeyInit(&skey[nkeys++],
451 : Anum_pg_subscription_rel_srsubid,
452 : BTEqualStrategyNumber,
453 : F_OIDEQ,
454 : ObjectIdGetDatum(subid));
455 : }
456 :
457 48176 : if (OidIsValid(relid))
458 : {
459 47956 : ScanKeyInit(&skey[nkeys++],
460 : Anum_pg_subscription_rel_srrelid,
461 : BTEqualStrategyNumber,
462 : F_OIDEQ,
463 : ObjectIdGetDatum(relid));
464 : }
465 :
466 : /* Do the search and delete what we found. */
467 48176 : scan = table_beginscan_catalog(rel, nkeys, skey);
468 48402 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
469 : {
470 : Form_pg_subscription_rel subrel;
471 :
472 226 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
473 :
474 : /*
475 : * We don't allow to drop the relation mapping when the table
476 : * synchronization is in progress unless the caller updates the
477 : * corresponding subscription as well. This is to ensure that we don't
478 : * leave tablesync slots or origins in the system when the
479 : * corresponding table is dropped.
480 : */
481 226 : if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
482 : {
483 0 : ereport(ERROR,
484 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
485 : errmsg("could not drop relation mapping for subscription \"%s\"",
486 : get_subscription_name(subrel->srsubid, false)),
487 : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
488 : get_rel_name(relid), subrel->srsubstate),
489 :
490 : /*
491 : * translator: first %s is a SQL ALTER command and second %s is a
492 : * SQL DROP command
493 : */
494 : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
495 : "ALTER SUBSCRIPTION ... ENABLE",
496 : "DROP SUBSCRIPTION ...")));
497 : }
498 :
499 226 : CatalogTupleDelete(rel, &tup->t_self);
500 : }
501 48176 : table_endscan(scan);
502 :
503 48176 : table_close(rel, RowExclusiveLock);
504 48176 : }
505 :
506 : /*
507 : * Does the subscription have any relations?
508 : *
509 : * Use this function only to know true/false, and when you have no need for the
510 : * List returned by GetSubscriptionRelations.
511 : */
512 : bool
513 492 : HasSubscriptionRelations(Oid subid)
514 : {
515 : Relation rel;
516 : ScanKeyData skey[1];
517 : SysScanDesc scan;
518 : bool has_subrels;
519 :
520 492 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
521 :
522 492 : ScanKeyInit(&skey[0],
523 : Anum_pg_subscription_rel_srsubid,
524 : BTEqualStrategyNumber, F_OIDEQ,
525 : ObjectIdGetDatum(subid));
526 :
527 492 : scan = systable_beginscan(rel, InvalidOid, false,
528 : NULL, 1, skey);
529 :
530 : /* If even a single tuple exists then the subscription has tables. */
531 492 : has_subrels = HeapTupleIsValid(systable_getnext(scan));
532 :
533 : /* Cleanup */
534 492 : systable_endscan(scan);
535 492 : table_close(rel, AccessShareLock);
536 :
537 492 : return has_subrels;
538 : }
539 :
540 : /*
541 : * Get the relations for the subscription.
542 : *
543 : * If not_ready is true, return only the relations that are not in a ready
544 : * state, otherwise return all the relations of the subscription. The
545 : * returned list is palloc'ed in the current memory context.
546 : */
547 : List *
548 2028 : GetSubscriptionRelations(Oid subid, bool not_ready)
549 : {
550 2028 : List *res = NIL;
551 : Relation rel;
552 : HeapTuple tup;
553 2028 : int nkeys = 0;
554 : ScanKeyData skey[2];
555 : SysScanDesc scan;
556 :
557 2028 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
558 :
559 2028 : ScanKeyInit(&skey[nkeys++],
560 : Anum_pg_subscription_rel_srsubid,
561 : BTEqualStrategyNumber, F_OIDEQ,
562 : ObjectIdGetDatum(subid));
563 :
564 2028 : if (not_ready)
565 1964 : ScanKeyInit(&skey[nkeys++],
566 : Anum_pg_subscription_rel_srsubstate,
567 : BTEqualStrategyNumber, F_CHARNE,
568 : CharGetDatum(SUBREL_STATE_READY));
569 :
570 2028 : scan = systable_beginscan(rel, InvalidOid, false,
571 : NULL, nkeys, skey);
572 :
573 5098 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
574 : {
575 : Form_pg_subscription_rel subrel;
576 : SubscriptionRelState *relstate;
577 : Datum d;
578 : bool isnull;
579 :
580 3070 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
581 :
582 3070 : relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
583 3070 : relstate->relid = subrel->srrelid;
584 3070 : relstate->state = subrel->srsubstate;
585 3070 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
586 : Anum_pg_subscription_rel_srsublsn, &isnull);
587 3070 : if (isnull)
588 2532 : relstate->lsn = InvalidXLogRecPtr;
589 : else
590 538 : relstate->lsn = DatumGetLSN(d);
591 :
592 3070 : res = lappend(res, relstate);
593 : }
594 :
595 : /* Cleanup */
596 2028 : systable_endscan(scan);
597 2028 : table_close(rel, AccessShareLock);
598 :
599 2028 : return res;
600 : }
|