Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "access/xact.h"
22 : #include "catalog/indexing.h"
23 : #include "catalog/pg_subscription.h"
24 : #include "catalog/pg_subscription_rel.h"
25 : #include "catalog/pg_type.h"
26 : #include "miscadmin.h"
27 : #include "nodes/makefuncs.h"
28 : #include "storage/lmgr.h"
29 : #include "utils/array.h"
30 : #include "utils/builtins.h"
31 : #include "utils/fmgroids.h"
32 : #include "utils/lsyscache.h"
33 : #include "utils/pg_lsn.h"
34 : #include "utils/rel.h"
35 : #include "utils/syscache.h"
36 :
37 : static List *textarray_to_stringlist(ArrayType *textarray);
38 :
39 : /*
40 : * Fetch the subscription from the syscache.
41 : */
42 : Subscription *
43 1226 : GetSubscription(Oid subid, bool missing_ok)
44 : {
45 : HeapTuple tup;
46 : Subscription *sub;
47 : Form_pg_subscription subform;
48 : Datum datum;
49 : bool isnull;
50 :
51 1226 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
52 :
53 1226 : if (!HeapTupleIsValid(tup))
54 : {
55 0 : if (missing_ok)
56 0 : return NULL;
57 :
58 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
59 : }
60 :
61 1226 : subform = (Form_pg_subscription) GETSTRUCT(tup);
62 :
63 1226 : sub = (Subscription *) palloc(sizeof(Subscription));
64 1226 : sub->oid = subid;
65 1226 : sub->dbid = subform->subdbid;
66 1226 : sub->skiplsn = subform->subskiplsn;
67 1226 : sub->name = pstrdup(NameStr(subform->subname));
68 1226 : sub->owner = subform->subowner;
69 1226 : sub->enabled = subform->subenabled;
70 1226 : sub->binary = subform->subbinary;
71 1226 : sub->stream = subform->substream;
72 1226 : sub->twophasestate = subform->subtwophasestate;
73 1226 : sub->disableonerr = subform->subdisableonerr;
74 1226 : sub->passwordrequired = subform->subpasswordrequired;
75 1226 : sub->runasowner = subform->subrunasowner;
76 :
77 : /* Get conninfo */
78 1226 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
79 : tup,
80 : Anum_pg_subscription_subconninfo);
81 1226 : sub->conninfo = TextDatumGetCString(datum);
82 :
83 : /* Get slotname */
84 1226 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
85 : tup,
86 : Anum_pg_subscription_subslotname,
87 : &isnull);
88 1226 : if (!isnull)
89 1160 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
90 : else
91 66 : sub->slotname = NULL;
92 :
93 : /* Get synccommit */
94 1226 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
95 : tup,
96 : Anum_pg_subscription_subsynccommit);
97 1226 : sub->synccommit = TextDatumGetCString(datum);
98 :
99 : /* Get publications */
100 1226 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
101 : tup,
102 : Anum_pg_subscription_subpublications);
103 1226 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
104 :
105 : /* Get origin */
106 1226 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
107 : tup,
108 : Anum_pg_subscription_suborigin);
109 1226 : sub->origin = TextDatumGetCString(datum);
110 :
111 : /* Is the subscription owner a superuser? */
112 1226 : sub->ownersuperuser = superuser_arg(sub->owner);
113 :
114 1226 : ReleaseSysCache(tup);
115 :
116 1226 : return sub;
117 : }
118 :
119 : /*
120 : * Return number of subscriptions defined in given database.
121 : * Used by dropdb() to check if database can indeed be dropped.
122 : */
123 : int
124 50 : CountDBSubscriptions(Oid dbid)
125 : {
126 50 : int nsubs = 0;
127 : Relation rel;
128 : ScanKeyData scankey;
129 : SysScanDesc scan;
130 : HeapTuple tup;
131 :
132 50 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
133 :
134 50 : ScanKeyInit(&scankey,
135 : Anum_pg_subscription_subdbid,
136 : BTEqualStrategyNumber, F_OIDEQ,
137 : ObjectIdGetDatum(dbid));
138 :
139 50 : scan = systable_beginscan(rel, InvalidOid, false,
140 : NULL, 1, &scankey);
141 :
142 50 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
143 0 : nsubs++;
144 :
145 50 : systable_endscan(scan);
146 :
147 50 : table_close(rel, NoLock);
148 :
149 50 : return nsubs;
150 : }
151 :
152 : /*
153 : * Free memory allocated by subscription struct.
154 : */
155 : void
156 60 : FreeSubscription(Subscription *sub)
157 : {
158 60 : pfree(sub->name);
159 60 : pfree(sub->conninfo);
160 60 : if (sub->slotname)
161 60 : pfree(sub->slotname);
162 60 : list_free_deep(sub->publications);
163 60 : pfree(sub);
164 60 : }
165 :
166 : /*
167 : * Disable the given subscription.
168 : */
169 : void
170 8 : DisableSubscription(Oid subid)
171 : {
172 : Relation rel;
173 : bool nulls[Natts_pg_subscription];
174 : bool replaces[Natts_pg_subscription];
175 : Datum values[Natts_pg_subscription];
176 : HeapTuple tup;
177 :
178 : /* Look up the subscription in the catalog */
179 8 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
180 8 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
181 :
182 8 : if (!HeapTupleIsValid(tup))
183 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
184 :
185 8 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
186 :
187 : /* Form a new tuple. */
188 8 : memset(values, 0, sizeof(values));
189 8 : memset(nulls, false, sizeof(nulls));
190 8 : memset(replaces, false, sizeof(replaces));
191 :
192 : /* Set the subscription to disabled. */
193 8 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
194 8 : replaces[Anum_pg_subscription_subenabled - 1] = true;
195 :
196 : /* Update the catalog */
197 8 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
198 : replaces);
199 8 : CatalogTupleUpdate(rel, &tup->t_self, tup);
200 8 : heap_freetuple(tup);
201 :
202 8 : table_close(rel, NoLock);
203 8 : }
204 :
205 : /*
206 : * Convert text array to list of strings.
207 : *
208 : * Note: the resulting list of strings is pallocated here.
209 : */
210 : static List *
211 1226 : textarray_to_stringlist(ArrayType *textarray)
212 : {
213 : Datum *elems;
214 : int nelems,
215 : i;
216 1226 : List *res = NIL;
217 :
218 1226 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
219 :
220 1226 : if (nelems == 0)
221 0 : return NIL;
222 :
223 3126 : for (i = 0; i < nelems; i++)
224 1900 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
225 :
226 1226 : return res;
227 : }
228 :
229 : /*
230 : * Add new state record for a subscription table.
231 : */
232 : void
233 332 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
234 : XLogRecPtr sublsn)
235 : {
236 : Relation rel;
237 : HeapTuple tup;
238 : bool nulls[Natts_pg_subscription_rel];
239 : Datum values[Natts_pg_subscription_rel];
240 :
241 332 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
242 :
243 332 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
244 :
245 : /* Try finding existing mapping. */
246 332 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
247 : ObjectIdGetDatum(relid),
248 : ObjectIdGetDatum(subid));
249 332 : if (HeapTupleIsValid(tup))
250 0 : elog(ERROR, "subscription table %u in subscription %u already exists",
251 : relid, subid);
252 :
253 : /* Form the tuple. */
254 332 : memset(values, 0, sizeof(values));
255 332 : memset(nulls, false, sizeof(nulls));
256 332 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
257 332 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
258 332 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
259 332 : if (sublsn != InvalidXLogRecPtr)
260 0 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
261 : else
262 332 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
263 :
264 332 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
265 :
266 : /* Insert tuple into catalog. */
267 332 : CatalogTupleInsert(rel, tup);
268 :
269 332 : heap_freetuple(tup);
270 :
271 : /* Cleanup. */
272 332 : table_close(rel, NoLock);
273 332 : }
274 :
275 : /*
276 : * Update the state of a subscription table.
277 : */
278 : void
279 1260 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
280 : XLogRecPtr sublsn)
281 : {
282 : Relation rel;
283 : HeapTuple tup;
284 : bool nulls[Natts_pg_subscription_rel];
285 : Datum values[Natts_pg_subscription_rel];
286 : bool replaces[Natts_pg_subscription_rel];
287 :
288 1260 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
289 :
290 1260 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
291 :
292 : /* Try finding existing mapping. */
293 1260 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
294 : ObjectIdGetDatum(relid),
295 : ObjectIdGetDatum(subid));
296 1260 : if (!HeapTupleIsValid(tup))
297 0 : elog(ERROR, "subscription table %u in subscription %u does not exist",
298 : relid, subid);
299 :
300 : /* Update the tuple. */
301 1260 : memset(values, 0, sizeof(values));
302 1260 : memset(nulls, false, sizeof(nulls));
303 1260 : memset(replaces, false, sizeof(replaces));
304 :
305 1260 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
306 1260 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
307 :
308 1260 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
309 1260 : if (sublsn != InvalidXLogRecPtr)
310 618 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
311 : else
312 642 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
313 :
314 1260 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
315 : replaces);
316 :
317 : /* Update the catalog. */
318 1260 : CatalogTupleUpdate(rel, &tup->t_self, tup);
319 :
320 : /* Cleanup. */
321 1260 : table_close(rel, NoLock);
322 1260 : }
323 :
324 : /*
325 : * Get state of subscription table.
326 : *
327 : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
328 : */
329 : char
330 1986 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
331 : {
332 : HeapTuple tup;
333 : char substate;
334 : bool isnull;
335 : Datum d;
336 : Relation rel;
337 :
338 : /*
339 : * This is to avoid the race condition with AlterSubscription which tries
340 : * to remove this relstate.
341 : */
342 1986 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
343 :
344 : /* Try finding the mapping. */
345 1986 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
346 : ObjectIdGetDatum(relid),
347 : ObjectIdGetDatum(subid));
348 :
349 1986 : if (!HeapTupleIsValid(tup))
350 : {
351 50 : table_close(rel, AccessShareLock);
352 50 : *sublsn = InvalidXLogRecPtr;
353 50 : return SUBREL_STATE_UNKNOWN;
354 : }
355 :
356 : /* Get the state. */
357 1936 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
358 :
359 : /* Get the LSN */
360 1936 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
361 : Anum_pg_subscription_rel_srsublsn, &isnull);
362 1936 : if (isnull)
363 1018 : *sublsn = InvalidXLogRecPtr;
364 : else
365 918 : *sublsn = DatumGetLSN(d);
366 :
367 : /* Cleanup */
368 1936 : ReleaseSysCache(tup);
369 :
370 1936 : table_close(rel, AccessShareLock);
371 :
372 1936 : return substate;
373 : }
374 :
375 : /*
376 : * Drop subscription relation mapping. These can be for a particular
377 : * subscription, or for a particular relation, or both.
378 : */
379 : void
380 39070 : RemoveSubscriptionRel(Oid subid, Oid relid)
381 : {
382 : Relation rel;
383 : TableScanDesc scan;
384 : ScanKeyData skey[2];
385 : HeapTuple tup;
386 39070 : int nkeys = 0;
387 :
388 39070 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
389 :
390 39070 : if (OidIsValid(subid))
391 : {
392 208 : ScanKeyInit(&skey[nkeys++],
393 : Anum_pg_subscription_rel_srsubid,
394 : BTEqualStrategyNumber,
395 : F_OIDEQ,
396 : ObjectIdGetDatum(subid));
397 : }
398 :
399 39070 : if (OidIsValid(relid))
400 : {
401 38924 : ScanKeyInit(&skey[nkeys++],
402 : Anum_pg_subscription_rel_srrelid,
403 : BTEqualStrategyNumber,
404 : F_OIDEQ,
405 : ObjectIdGetDatum(relid));
406 : }
407 :
408 : /* Do the search and delete what we found. */
409 39070 : scan = table_beginscan_catalog(rel, nkeys, skey);
410 39240 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
411 : {
412 : Form_pg_subscription_rel subrel;
413 :
414 170 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
415 :
416 : /*
417 : * We don't allow to drop the relation mapping when the table
418 : * synchronization is in progress unless the caller updates the
419 : * corresponding subscription as well. This is to ensure that we don't
420 : * leave tablesync slots or origins in the system when the
421 : * corresponding table is dropped.
422 : */
423 170 : if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
424 : {
425 0 : ereport(ERROR,
426 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
427 : errmsg("could not drop relation mapping for subscription \"%s\"",
428 : get_subscription_name(subrel->srsubid, false)),
429 : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
430 : get_rel_name(relid), subrel->srsubstate),
431 :
432 : /*
433 : * translator: first %s is a SQL ALTER command and second %s is a
434 : * SQL DROP command
435 : */
436 : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
437 : "ALTER SUBSCRIPTION ... ENABLE",
438 : "DROP SUBSCRIPTION ...")));
439 : }
440 :
441 170 : CatalogTupleDelete(rel, &tup->t_self);
442 : }
443 39070 : table_endscan(scan);
444 :
445 39070 : table_close(rel, RowExclusiveLock);
446 39070 : }
447 :
448 : /*
449 : * Does the subscription have any relations?
450 : *
451 : * Use this function only to know true/false, and when you have no need for the
452 : * List returned by GetSubscriptionRelations.
453 : */
454 : bool
455 368 : HasSubscriptionRelations(Oid subid)
456 : {
457 : Relation rel;
458 : ScanKeyData skey[1];
459 : SysScanDesc scan;
460 : bool has_subrels;
461 :
462 368 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
463 :
464 368 : ScanKeyInit(&skey[0],
465 : Anum_pg_subscription_rel_srsubid,
466 : BTEqualStrategyNumber, F_OIDEQ,
467 : ObjectIdGetDatum(subid));
468 :
469 368 : scan = systable_beginscan(rel, InvalidOid, false,
470 : NULL, 1, skey);
471 :
472 : /* If even a single tuple exists then the subscription has tables. */
473 368 : has_subrels = HeapTupleIsValid(systable_getnext(scan));
474 :
475 : /* Cleanup */
476 368 : systable_endscan(scan);
477 368 : table_close(rel, AccessShareLock);
478 :
479 368 : return has_subrels;
480 : }
481 :
482 : /*
483 : * Get the relations for the subscription.
484 : *
485 : * If not_ready is true, return only the relations that are not in a ready
486 : * state, otherwise return all the relations of the subscription. The
487 : * returned list is palloc'ed in the current memory context.
488 : */
489 : List *
490 1602 : GetSubscriptionRelations(Oid subid, bool not_ready)
491 : {
492 1602 : List *res = NIL;
493 : Relation rel;
494 : HeapTuple tup;
495 1602 : int nkeys = 0;
496 : ScanKeyData skey[2];
497 : SysScanDesc scan;
498 :
499 1602 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
500 :
501 1602 : ScanKeyInit(&skey[nkeys++],
502 : Anum_pg_subscription_rel_srsubid,
503 : BTEqualStrategyNumber, F_OIDEQ,
504 : ObjectIdGetDatum(subid));
505 :
506 1602 : if (not_ready)
507 1526 : ScanKeyInit(&skey[nkeys++],
508 : Anum_pg_subscription_rel_srsubstate,
509 : BTEqualStrategyNumber, F_CHARNE,
510 : CharGetDatum(SUBREL_STATE_READY));
511 :
512 1602 : scan = systable_beginscan(rel, InvalidOid, false,
513 : NULL, nkeys, skey);
514 :
515 3914 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
516 : {
517 : Form_pg_subscription_rel subrel;
518 : SubscriptionRelState *relstate;
519 : Datum d;
520 : bool isnull;
521 :
522 2312 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
523 :
524 2312 : relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
525 2312 : relstate->relid = subrel->srrelid;
526 2312 : relstate->state = subrel->srsubstate;
527 2312 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
528 : Anum_pg_subscription_rel_srsublsn, &isnull);
529 2312 : if (isnull)
530 1816 : relstate->lsn = InvalidXLogRecPtr;
531 : else
532 496 : relstate->lsn = DatumGetLSN(d);
533 :
534 2312 : res = lappend(res, relstate);
535 : }
536 :
537 : /* Cleanup */
538 1602 : systable_endscan(scan);
539 1602 : table_close(rel, AccessShareLock);
540 :
541 1602 : return res;
542 : }
|