Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/pg_subscription.h"
23 : #include "catalog/pg_subscription_rel.h"
24 : #include "catalog/pg_type.h"
25 : #include "miscadmin.h"
26 : #include "storage/lmgr.h"
27 : #include "utils/array.h"
28 : #include "utils/builtins.h"
29 : #include "utils/fmgroids.h"
30 : #include "utils/lsyscache.h"
31 : #include "utils/pg_lsn.h"
32 : #include "utils/rel.h"
33 : #include "utils/syscache.h"
34 :
35 : static List *textarray_to_stringlist(ArrayType *textarray);
36 :
37 : /*
38 : * Fetch the subscription from the syscache.
39 : */
40 : Subscription *
41 1288 : GetSubscription(Oid subid, bool missing_ok)
42 : {
43 : HeapTuple tup;
44 : Subscription *sub;
45 : Form_pg_subscription subform;
46 : Datum datum;
47 : bool isnull;
48 :
49 1288 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
50 :
51 1284 : if (!HeapTupleIsValid(tup))
52 : {
53 0 : if (missing_ok)
54 0 : return NULL;
55 :
56 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
57 : }
58 :
59 1284 : subform = (Form_pg_subscription) GETSTRUCT(tup);
60 :
61 1284 : sub = (Subscription *) palloc(sizeof(Subscription));
62 1284 : sub->oid = subid;
63 1284 : sub->dbid = subform->subdbid;
64 1284 : sub->skiplsn = subform->subskiplsn;
65 1284 : sub->name = pstrdup(NameStr(subform->subname));
66 1284 : sub->owner = subform->subowner;
67 1284 : sub->enabled = subform->subenabled;
68 1284 : sub->binary = subform->subbinary;
69 1284 : sub->stream = subform->substream;
70 1284 : sub->twophasestate = subform->subtwophasestate;
71 1284 : sub->disableonerr = subform->subdisableonerr;
72 1284 : sub->passwordrequired = subform->subpasswordrequired;
73 1284 : sub->runasowner = subform->subrunasowner;
74 1284 : sub->failover = subform->subfailover;
75 :
76 : /* Get conninfo */
77 1284 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
78 : tup,
79 : Anum_pg_subscription_subconninfo);
80 1284 : sub->conninfo = TextDatumGetCString(datum);
81 :
82 : /* Get slotname */
83 1284 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
84 : tup,
85 : Anum_pg_subscription_subslotname,
86 : &isnull);
87 1284 : if (!isnull)
88 1218 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
89 : else
90 66 : sub->slotname = NULL;
91 :
92 : /* Get synccommit */
93 1284 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
94 : tup,
95 : Anum_pg_subscription_subsynccommit);
96 1284 : sub->synccommit = TextDatumGetCString(datum);
97 :
98 : /* Get publications */
99 1284 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
100 : tup,
101 : Anum_pg_subscription_subpublications);
102 1284 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
103 :
104 : /* Get origin */
105 1284 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
106 : tup,
107 : Anum_pg_subscription_suborigin);
108 1284 : sub->origin = TextDatumGetCString(datum);
109 :
110 : /* Is the subscription owner a superuser? */
111 1284 : sub->ownersuperuser = superuser_arg(sub->owner);
112 :
113 1284 : ReleaseSysCache(tup);
114 :
115 1284 : return sub;
116 : }
117 :
118 : /*
119 : * Return number of subscriptions defined in given database.
120 : * Used by dropdb() to check if database can indeed be dropped.
121 : */
122 : int
123 62 : CountDBSubscriptions(Oid dbid)
124 : {
125 62 : int nsubs = 0;
126 : Relation rel;
127 : ScanKeyData scankey;
128 : SysScanDesc scan;
129 : HeapTuple tup;
130 :
131 62 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
132 :
133 62 : ScanKeyInit(&scankey,
134 : Anum_pg_subscription_subdbid,
135 : BTEqualStrategyNumber, F_OIDEQ,
136 : ObjectIdGetDatum(dbid));
137 :
138 62 : scan = systable_beginscan(rel, InvalidOid, false,
139 : NULL, 1, &scankey);
140 :
141 62 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
142 0 : nsubs++;
143 :
144 62 : systable_endscan(scan);
145 :
146 62 : table_close(rel, NoLock);
147 :
148 62 : return nsubs;
149 : }
150 :
151 : /*
152 : * Free memory allocated by subscription struct.
153 : */
154 : void
155 64 : FreeSubscription(Subscription *sub)
156 : {
157 64 : pfree(sub->name);
158 64 : pfree(sub->conninfo);
159 64 : if (sub->slotname)
160 64 : pfree(sub->slotname);
161 64 : list_free_deep(sub->publications);
162 64 : pfree(sub);
163 64 : }
164 :
165 : /*
166 : * Disable the given subscription.
167 : */
168 : void
169 8 : DisableSubscription(Oid subid)
170 : {
171 : Relation rel;
172 : bool nulls[Natts_pg_subscription];
173 : bool replaces[Natts_pg_subscription];
174 : Datum values[Natts_pg_subscription];
175 : HeapTuple tup;
176 :
177 : /* Look up the subscription in the catalog */
178 8 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
179 8 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
180 :
181 8 : if (!HeapTupleIsValid(tup))
182 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
183 :
184 8 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
185 :
186 : /* Form a new tuple. */
187 8 : memset(values, 0, sizeof(values));
188 8 : memset(nulls, false, sizeof(nulls));
189 8 : memset(replaces, false, sizeof(replaces));
190 :
191 : /* Set the subscription to disabled. */
192 8 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
193 8 : replaces[Anum_pg_subscription_subenabled - 1] = true;
194 :
195 : /* Update the catalog */
196 8 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
197 : replaces);
198 8 : CatalogTupleUpdate(rel, &tup->t_self, tup);
199 8 : heap_freetuple(tup);
200 :
201 8 : table_close(rel, NoLock);
202 8 : }
203 :
204 : /*
205 : * Convert text array to list of strings.
206 : *
207 : * Note: the resulting list of strings is pallocated here.
208 : */
209 : static List *
210 1284 : textarray_to_stringlist(ArrayType *textarray)
211 : {
212 : Datum *elems;
213 : int nelems,
214 : i;
215 1284 : List *res = NIL;
216 :
217 1284 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
218 :
219 1284 : if (nelems == 0)
220 0 : return NIL;
221 :
222 3228 : for (i = 0; i < nelems; i++)
223 1944 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
224 :
225 1284 : return res;
226 : }
227 :
228 : /*
229 : * Add new state record for a subscription table.
230 : *
231 : * If retain_lock is true, then don't release the locks taken in this function.
232 : * We normally release the locks at the end of transaction but in binary-upgrade
233 : * mode, we expect to release those immediately.
234 : */
235 : void
236 350 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
237 : XLogRecPtr sublsn, bool retain_lock)
238 : {
239 : Relation rel;
240 : HeapTuple tup;
241 : bool nulls[Natts_pg_subscription_rel];
242 : Datum values[Natts_pg_subscription_rel];
243 :
244 350 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
245 :
246 350 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
247 :
248 : /* Try finding existing mapping. */
249 350 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
250 : ObjectIdGetDatum(relid),
251 : ObjectIdGetDatum(subid));
252 350 : if (HeapTupleIsValid(tup))
253 0 : elog(ERROR, "subscription table %u in subscription %u already exists",
254 : relid, subid);
255 :
256 : /* Form the tuple. */
257 350 : memset(values, 0, sizeof(values));
258 350 : memset(nulls, false, sizeof(nulls));
259 350 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
260 350 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
261 350 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
262 350 : if (sublsn != InvalidXLogRecPtr)
263 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
264 : else
265 348 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
266 :
267 350 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
268 :
269 : /* Insert tuple into catalog. */
270 350 : CatalogTupleInsert(rel, tup);
271 :
272 350 : heap_freetuple(tup);
273 :
274 : /* Cleanup. */
275 350 : if (retain_lock)
276 : {
277 346 : table_close(rel, NoLock);
278 : }
279 : else
280 : {
281 4 : table_close(rel, RowExclusiveLock);
282 4 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
283 : }
284 350 : }
285 :
286 : /*
287 : * Update the state of a subscription table.
288 : */
289 : void
290 1294 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
291 : XLogRecPtr sublsn)
292 : {
293 : Relation rel;
294 : HeapTuple tup;
295 : bool nulls[Natts_pg_subscription_rel];
296 : Datum values[Natts_pg_subscription_rel];
297 : bool replaces[Natts_pg_subscription_rel];
298 :
299 1294 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
300 :
301 1294 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
302 :
303 : /* Try finding existing mapping. */
304 1294 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
305 : ObjectIdGetDatum(relid),
306 : ObjectIdGetDatum(subid));
307 1294 : if (!HeapTupleIsValid(tup))
308 0 : elog(ERROR, "subscription table %u in subscription %u does not exist",
309 : relid, subid);
310 :
311 : /* Update the tuple. */
312 1294 : memset(values, 0, sizeof(values));
313 1294 : memset(nulls, false, sizeof(nulls));
314 1294 : memset(replaces, false, sizeof(replaces));
315 :
316 1294 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
317 1294 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
318 :
319 1294 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
320 1294 : if (sublsn != InvalidXLogRecPtr)
321 636 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
322 : else
323 658 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
324 :
325 1294 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
326 : replaces);
327 :
328 : /* Update the catalog. */
329 1294 : CatalogTupleUpdate(rel, &tup->t_self, tup);
330 :
331 : /* Cleanup. */
332 1294 : table_close(rel, NoLock);
333 1294 : }
334 :
335 : /*
336 : * Get state of subscription table.
337 : *
338 : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
339 : */
340 : char
341 2028 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
342 : {
343 : HeapTuple tup;
344 : char substate;
345 : bool isnull;
346 : Datum d;
347 : Relation rel;
348 :
349 : /*
350 : * This is to avoid the race condition with AlterSubscription which tries
351 : * to remove this relstate.
352 : */
353 2028 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
354 :
355 : /* Try finding the mapping. */
356 2028 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
357 : ObjectIdGetDatum(relid),
358 : ObjectIdGetDatum(subid));
359 :
360 2028 : if (!HeapTupleIsValid(tup))
361 : {
362 52 : table_close(rel, AccessShareLock);
363 52 : *sublsn = InvalidXLogRecPtr;
364 52 : return SUBREL_STATE_UNKNOWN;
365 : }
366 :
367 : /* Get the state. */
368 1976 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
369 :
370 : /* Get the LSN */
371 1976 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
372 : Anum_pg_subscription_rel_srsublsn, &isnull);
373 1976 : if (isnull)
374 1062 : *sublsn = InvalidXLogRecPtr;
375 : else
376 914 : *sublsn = DatumGetLSN(d);
377 :
378 : /* Cleanup */
379 1976 : ReleaseSysCache(tup);
380 :
381 1976 : table_close(rel, AccessShareLock);
382 :
383 1976 : return substate;
384 : }
385 :
386 : /*
387 : * Drop subscription relation mapping. These can be for a particular
388 : * subscription, or for a particular relation, or both.
389 : */
390 : void
391 42534 : RemoveSubscriptionRel(Oid subid, Oid relid)
392 : {
393 : Relation rel;
394 : TableScanDesc scan;
395 : ScanKeyData skey[2];
396 : HeapTuple tup;
397 42534 : int nkeys = 0;
398 :
399 42534 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
400 :
401 42534 : if (OidIsValid(subid))
402 : {
403 210 : ScanKeyInit(&skey[nkeys++],
404 : Anum_pg_subscription_rel_srsubid,
405 : BTEqualStrategyNumber,
406 : F_OIDEQ,
407 : ObjectIdGetDatum(subid));
408 : }
409 :
410 42534 : if (OidIsValid(relid))
411 : {
412 42360 : ScanKeyInit(&skey[nkeys++],
413 : Anum_pg_subscription_rel_srrelid,
414 : BTEqualStrategyNumber,
415 : F_OIDEQ,
416 : ObjectIdGetDatum(relid));
417 : }
418 :
419 : /* Do the search and delete what we found. */
420 42534 : scan = table_beginscan_catalog(rel, nkeys, skey);
421 42706 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
422 : {
423 : Form_pg_subscription_rel subrel;
424 :
425 172 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
426 :
427 : /*
428 : * We don't allow to drop the relation mapping when the table
429 : * synchronization is in progress unless the caller updates the
430 : * corresponding subscription as well. This is to ensure that we don't
431 : * leave tablesync slots or origins in the system when the
432 : * corresponding table is dropped.
433 : */
434 172 : if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
435 : {
436 0 : ereport(ERROR,
437 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
438 : errmsg("could not drop relation mapping for subscription \"%s\"",
439 : get_subscription_name(subrel->srsubid, false)),
440 : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
441 : get_rel_name(relid), subrel->srsubstate),
442 :
443 : /*
444 : * translator: first %s is a SQL ALTER command and second %s is a
445 : * SQL DROP command
446 : */
447 : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
448 : "ALTER SUBSCRIPTION ... ENABLE",
449 : "DROP SUBSCRIPTION ...")));
450 : }
451 :
452 172 : CatalogTupleDelete(rel, &tup->t_self);
453 : }
454 42534 : table_endscan(scan);
455 :
456 42534 : table_close(rel, RowExclusiveLock);
457 42534 : }
458 :
459 : /*
460 : * Does the subscription have any relations?
461 : *
462 : * Use this function only to know true/false, and when you have no need for the
463 : * List returned by GetSubscriptionRelations.
464 : */
465 : bool
466 386 : HasSubscriptionRelations(Oid subid)
467 : {
468 : Relation rel;
469 : ScanKeyData skey[1];
470 : SysScanDesc scan;
471 : bool has_subrels;
472 :
473 386 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
474 :
475 386 : ScanKeyInit(&skey[0],
476 : Anum_pg_subscription_rel_srsubid,
477 : BTEqualStrategyNumber, F_OIDEQ,
478 : ObjectIdGetDatum(subid));
479 :
480 386 : scan = systable_beginscan(rel, InvalidOid, false,
481 : NULL, 1, skey);
482 :
483 : /* If even a single tuple exists then the subscription has tables. */
484 386 : has_subrels = HeapTupleIsValid(systable_getnext(scan));
485 :
486 : /* Cleanup */
487 386 : systable_endscan(scan);
488 386 : table_close(rel, AccessShareLock);
489 :
490 386 : return has_subrels;
491 : }
492 :
493 : /*
494 : * Get the relations for the subscription.
495 : *
496 : * If not_ready is true, return only the relations that are not in a ready
497 : * state, otherwise return all the relations of the subscription. The
498 : * returned list is palloc'ed in the current memory context.
499 : */
500 : List *
501 1656 : GetSubscriptionRelations(Oid subid, bool not_ready)
502 : {
503 1656 : List *res = NIL;
504 : Relation rel;
505 : HeapTuple tup;
506 1656 : int nkeys = 0;
507 : ScanKeyData skey[2];
508 : SysScanDesc scan;
509 :
510 1656 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
511 :
512 1656 : ScanKeyInit(&skey[nkeys++],
513 : Anum_pg_subscription_rel_srsubid,
514 : BTEqualStrategyNumber, F_OIDEQ,
515 : ObjectIdGetDatum(subid));
516 :
517 1656 : if (not_ready)
518 1600 : ScanKeyInit(&skey[nkeys++],
519 : Anum_pg_subscription_rel_srsubstate,
520 : BTEqualStrategyNumber, F_CHARNE,
521 : CharGetDatum(SUBREL_STATE_READY));
522 :
523 1656 : scan = systable_beginscan(rel, InvalidOid, false,
524 : NULL, nkeys, skey);
525 :
526 3956 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
527 : {
528 : Form_pg_subscription_rel subrel;
529 : SubscriptionRelState *relstate;
530 : Datum d;
531 : bool isnull;
532 :
533 2300 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
534 :
535 2300 : relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
536 2300 : relstate->relid = subrel->srrelid;
537 2300 : relstate->state = subrel->srsubstate;
538 2300 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
539 : Anum_pg_subscription_rel_srsublsn, &isnull);
540 2300 : if (isnull)
541 1822 : relstate->lsn = InvalidXLogRecPtr;
542 : else
543 478 : relstate->lsn = DatumGetLSN(d);
544 :
545 2300 : res = lappend(res, relstate);
546 : }
547 :
548 : /* Cleanup */
549 1656 : systable_endscan(scan);
550 1656 : table_close(rel, AccessShareLock);
551 :
552 1656 : return res;
553 : }
|