Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_subscription.c
4 : : * replication subscriptions
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_subscription.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/indexing.h"
22 : : #include "catalog/pg_foreign_server.h"
23 : : #include "catalog/pg_subscription.h"
24 : : #include "catalog/pg_subscription_rel.h"
25 : : #include "catalog/pg_type.h"
26 : : #include "foreign/foreign.h"
27 : : #include "miscadmin.h"
28 : : #include "storage/lmgr.h"
29 : : #include "storage/lock.h"
30 : : #include "utils/acl.h"
31 : : #include "utils/array.h"
32 : : #include "utils/builtins.h"
33 : : #include "utils/fmgroids.h"
34 : : #include "utils/lsyscache.h"
35 : : #include "utils/memutils.h"
36 : : #include "utils/pg_lsn.h"
37 : : #include "utils/rel.h"
38 : : #include "utils/syscache.h"
39 : :
40 : : static List *textarray_to_stringlist(ArrayType *textarray);
41 : :
42 : : /*
43 : : * Add a comma-separated list of publication names to the 'dest' string.
44 : : *
45 : : * If quote_literal is true, the returned list can be used to construct an SQL
46 : : * command, thus no translation is applied. Otherwise, the string can be used
47 : : * to create a user-facing message, so translatable quote marks are added.
48 : : */
49 : : void
613 michael@paquier.xyz 50 :CBC 542 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
51 : : {
52 : : ListCell *lc;
53 : 542 : bool first = true;
54 : :
55 [ - + ]: 542 : Assert(publications != NIL);
56 : :
57 [ + - + + : 1392 : foreach(lc, publications)
+ + ]
58 : : {
59 : 850 : char *pubname = strVal(lfirst(lc));
60 : :
61 [ + + ]: 850 : if (quote_literal)
62 : : {
19 alvherre@kurilemu.de 63 [ + + ]:GNC 840 : if (!first)
64 : 307 : appendStringInfoString(dest, ", ");
613 michael@paquier.xyz 65 :CBC 840 : appendStringInfoString(dest, quote_literal_cstr(pubname));
66 : : }
67 : : else
68 : : {
19 alvherre@kurilemu.de 69 [ + + ]:GNC 10 : if (first)
70 : 9 : appendStringInfo(dest, _("\"%s\""), pubname);
71 : : else
72 : 1 : appendStringInfo(dest, _(", \"%s\""), pubname);
73 : : }
74 : :
75 : 850 : first = false;
76 : : }
613 michael@paquier.xyz 77 :CBC 542 : }
78 : :
79 : : /*
80 : : * Fetch the subscription from the syscache.
81 : : *
82 : : * If conninfo_needed is true, conninfo will be constructed, possibly
83 : : * encountering errors in ForeignServerConnectionString(). Callers not
84 : : * expecting such errors should pass false, in which case conninfo will be
85 : : * NULL.
86 : : */
87 : : Subscription *
13 jdavis@postgresql.or 88 :GNC 978 : GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed,
89 : : bool conninfo_aclcheck)
90 : : {
91 : : HeapTuple tup;
92 : : Subscription *sub;
93 : : Form_pg_subscription subform;
94 : : Datum datum;
95 : : bool isnull;
96 : : MemoryContext cxt;
97 : : MemoryContext oldcxt;
98 : :
99 [ + + - + ]: 978 : Assert(conninfo_needed || !conninfo_aclcheck);
100 : :
3449 peter_e@gmx.net 101 :CBC 978 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
102 : :
103 [ + + ]: 978 : if (!HeapTupleIsValid(tup))
104 : : {
105 [ + - ]: 66 : if (missing_ok)
106 : 66 : return NULL;
107 : :
3449 peter_e@gmx.net 108 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
109 : : }
110 : :
98 jdavis@postgresql.or 111 :GNC 912 : cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
112 : : ALLOCSET_SMALL_SIZES);
113 : 912 : oldcxt = MemoryContextSwitchTo(cxt);
114 : :
3449 peter_e@gmx.net 115 :CBC 912 : subform = (Form_pg_subscription) GETSTRUCT(tup);
116 : :
13 jdavis@postgresql.or 117 :GNC 912 : sub = palloc0_object(Subscription);
98 118 : 912 : sub->cxt = cxt;
3449 peter_e@gmx.net 119 :CBC 912 : sub->oid = subid;
120 : 912 : sub->dbid = subform->subdbid;
1545 akapila@postgresql.o 121 : 912 : sub->skiplsn = subform->subskiplsn;
3449 peter_e@gmx.net 122 : 912 : sub->name = pstrdup(NameStr(subform->subname));
123 : 912 : sub->owner = subform->subowner;
124 : 912 : sub->enabled = subform->subenabled;
2173 tgl@sss.pgh.pa.us 125 : 912 : sub->binary = subform->subbinary;
2126 akapila@postgresql.o 126 : 912 : sub->stream = subform->substream;
1812 127 : 912 : sub->twophasestate = subform->subtwophasestate;
1569 128 : 912 : sub->disableonerr = subform->subdisableonerr;
1188 rhaas@postgresql.org 129 : 912 : sub->passwordrequired = subform->subpasswordrequired;
1183 130 : 912 : sub->runasowner = subform->subrunasowner;
882 akapila@postgresql.o 131 : 912 : sub->failover = subform->subfailover;
342 akapila@postgresql.o 132 :GNC 912 : sub->retaindeadtuples = subform->subretaindeadtuples;
301 133 : 912 : sub->maxretention = subform->submaxretention;
134 : 912 : sub->retentionactive = subform->subretentionactive;
135 : :
13 jdavis@postgresql.or 136 [ + + ]: 912 : if (conninfo_needed)
137 : : {
138 [ + + ]: 810 : if (OidIsValid(subform->subserver))
139 : : {
140 : : AclResult aclresult;
141 : : ForeignServer *server;
142 : :
143 : 4 : server = GetForeignServer(subform->subserver);
144 : :
145 [ + - ]: 4 : if (conninfo_aclcheck)
146 : : {
147 : : /* recheck ACL if requested */
148 : 4 : aclresult = object_aclcheck(ForeignServerRelationId,
149 : : subform->subserver,
150 : : subform->subowner, ACL_USAGE);
151 : :
152 [ - + ]: 4 : if (aclresult != ACLCHECK_OK)
13 jdavis@postgresql.or 153 [ # # ]:UNC 0 : ereport(ERROR,
154 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
155 : : errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
156 : : GetUserNameFromId(subform->subowner, false),
157 : : server->servername)));
158 : : }
159 : :
13 jdavis@postgresql.or 160 :GNC 4 : sub->conninfo = ForeignServerConnectionString(subform->subowner,
161 : : server);
162 : : }
163 : : else
164 : : {
165 : 806 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
166 : : tup,
167 : : Anum_pg_subscription_subconninfo);
168 : 806 : sub->conninfo = TextDatumGetCString(datum);
169 : : }
170 : : }
171 : :
172 : : /* Get slotname */
3449 peter_e@gmx.net 173 :CBC 912 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
174 : : tup,
175 : : Anum_pg_subscription_subslotname,
176 : : &isnull);
3339 177 [ + + ]: 912 : if (!isnull)
178 : 868 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
179 : : else
180 : 44 : sub->slotname = NULL;
181 : :
182 : : /* Get synccommit */
1193 dgustafsson@postgres 183 : 912 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
184 : : tup,
185 : : Anum_pg_subscription_subsynccommit);
3364 peter_e@gmx.net 186 : 912 : sub->synccommit = TextDatumGetCString(datum);
187 : :
188 : : /* Get walrcvtimeout */
130 fujii@postgresql.org 189 :GNC 912 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
190 : : tup,
191 : : Anum_pg_subscription_subwalrcvtimeout);
192 : 912 : sub->walrcvtimeout = TextDatumGetCString(datum);
193 : :
194 : : /* Get publications */
1193 dgustafsson@postgres 195 :CBC 912 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
196 : : tup,
197 : : Anum_pg_subscription_subpublications);
3449 peter_e@gmx.net 198 : 912 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
199 : :
200 : : /* Get origin */
1193 dgustafsson@postgres 201 : 912 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
202 : : tup,
203 : : Anum_pg_subscription_suborigin);
1440 akapila@postgresql.o 204 : 912 : sub->origin = TextDatumGetCString(datum);
205 : :
206 : : /* Is the subscription owner a superuser? */
987 207 : 912 : sub->ownersuperuser = superuser_arg(sub->owner);
208 : :
3449 peter_e@gmx.net 209 : 912 : ReleaseSysCache(tup);
210 : :
98 jdavis@postgresql.or 211 :GNC 912 : MemoryContextSwitchTo(oldcxt);
212 : :
3449 peter_e@gmx.net 213 :CBC 912 : return sub;
214 : : }
215 : :
216 : : /*
217 : : * Return number of subscriptions defined in given database.
218 : : * Used by dropdb() to check if database can indeed be dropped.
219 : : */
220 : : int
221 : 50 : CountDBSubscriptions(Oid dbid)
222 : : {
3331 bruce@momjian.us 223 : 50 : int nsubs = 0;
224 : : Relation rel;
225 : : ScanKeyData scankey;
226 : : SysScanDesc scan;
227 : : HeapTuple tup;
228 : :
2717 andres@anarazel.de 229 : 50 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
230 : :
3449 peter_e@gmx.net 231 : 50 : ScanKeyInit(&scankey,
232 : : Anum_pg_subscription_subdbid,
233 : : BTEqualStrategyNumber, F_OIDEQ,
234 : : ObjectIdGetDatum(dbid));
235 : :
236 : 50 : scan = systable_beginscan(rel, InvalidOid, false,
237 : : NULL, 1, &scankey);
238 : :
239 [ - + ]: 50 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
3449 peter_e@gmx.net 240 :UBC 0 : nsubs++;
241 : :
3449 peter_e@gmx.net 242 :CBC 50 : systable_endscan(scan);
243 : :
2717 andres@anarazel.de 244 : 50 : table_close(rel, NoLock);
245 : :
3449 peter_e@gmx.net 246 : 50 : return nsubs;
247 : : }
248 : :
249 : : /*
250 : : * Disable the given subscription.
251 : : */
252 : : void
1569 akapila@postgresql.o 253 : 4 : DisableSubscription(Oid subid)
254 : : {
255 : : Relation rel;
256 : : bool nulls[Natts_pg_subscription];
257 : : bool replaces[Natts_pg_subscription];
258 : : Datum values[Natts_pg_subscription];
259 : : HeapTuple tup;
260 : :
261 : : /* Look up the subscription in the catalog */
262 : 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
263 : 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
264 : :
265 [ - + ]: 4 : if (!HeapTupleIsValid(tup))
1569 akapila@postgresql.o 266 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
267 : :
1569 akapila@postgresql.o 268 :CBC 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
269 : :
270 : : /* Form a new tuple. */
271 : 4 : memset(values, 0, sizeof(values));
272 : 4 : memset(nulls, false, sizeof(nulls));
273 : 4 : memset(replaces, false, sizeof(replaces));
274 : :
275 : : /* Set the subscription to disabled. */
276 : 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
277 : 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
278 : :
279 : : /* Update the catalog */
280 : 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
281 : : replaces);
282 : 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
283 : 4 : heap_freetuple(tup);
284 : :
285 : 4 : table_close(rel, NoLock);
286 : 4 : }
287 : :
288 : : /*
289 : : * Convert text array to list of strings.
290 : : *
291 : : * Note: the resulting list of strings is pallocated here.
292 : : */
293 : : static List *
3449 peter_e@gmx.net 294 : 912 : textarray_to_stringlist(ArrayType *textarray)
295 : : {
296 : : Datum *elems;
297 : : int nelems,
298 : : i;
3331 bruce@momjian.us 299 : 912 : List *res = NIL;
300 : :
1460 peter@eisentraut.org 301 : 912 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
302 : :
3449 peter_e@gmx.net 303 [ - + ]: 912 : if (nelems == 0)
3449 peter_e@gmx.net 304 :UBC 0 : return NIL;
305 : :
3449 peter_e@gmx.net 306 [ + + ]:CBC 2226 : for (i = 0; i < nelems; i++)
3364 307 : 1314 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
308 : :
3449 309 : 912 : return res;
310 : : }
311 : :
312 : : /*
313 : : * Add new state record for a subscription table.
314 : : *
315 : : * If retain_lock is true, then don't release the locks taken in this function.
316 : : * We normally release the locks at the end of transaction but in binary-upgrade
317 : : * mode, we expect to release those immediately.
318 : : */
319 : : void
3007 320 : 228 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
321 : : XLogRecPtr sublsn, bool retain_lock)
322 : : {
323 : : Relation rel;
324 : : HeapTuple tup;
325 : : bool nulls[Natts_pg_subscription_rel];
326 : : Datum values[Natts_pg_subscription_rel];
327 : :
3284 328 : 228 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
329 : :
2717 andres@anarazel.de 330 : 228 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
331 : :
332 : : /* Try finding existing mapping. */
3386 peter_e@gmx.net 333 : 228 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
334 : : ObjectIdGetDatum(relid),
335 : : ObjectIdGetDatum(subid));
3007 336 [ - + ]: 228 : if (HeapTupleIsValid(tup))
250 akapila@postgresql.o 337 [ # # ]:UNC 0 : elog(ERROR, "subscription relation %u in subscription %u already exists",
338 : : relid, subid);
339 : :
340 : : /* Form the tuple. */
3007 peter_e@gmx.net 341 :CBC 228 : memset(values, 0, sizeof(values));
342 : 228 : memset(nulls, false, sizeof(nulls));
343 : 228 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
344 : 228 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
345 : 228 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
236 alvherre@kurilemu.de 346 [ + + ]:GNC 228 : if (XLogRecPtrIsValid(sublsn))
3007 peter_e@gmx.net 347 :CBC 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
348 : : else
349 : 226 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
350 : :
351 : 228 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
352 : :
353 : : /* Insert tuple into catalog. */
2779 andres@anarazel.de 354 : 228 : CatalogTupleInsert(rel, tup);
355 : :
3007 peter_e@gmx.net 356 : 228 : heap_freetuple(tup);
357 : :
358 : : /* Cleanup. */
910 akapila@postgresql.o 359 [ + + ]: 228 : if (retain_lock)
360 : : {
361 : 225 : table_close(rel, NoLock);
362 : : }
363 : : else
364 : : {
365 : 3 : table_close(rel, RowExclusiveLock);
366 : 3 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
367 : : }
3007 peter_e@gmx.net 368 : 228 : }
369 : :
370 : : /*
371 : : * Update the state of a subscription table.
372 : : */
373 : : void
374 : 825 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
375 : : XLogRecPtr sublsn, bool already_locked)
376 : : {
377 : : Relation rel;
378 : : HeapTuple tup;
379 : : bool nulls[Natts_pg_subscription_rel];
380 : : Datum values[Natts_pg_subscription_rel];
381 : : bool replaces[Natts_pg_subscription_rel];
382 : :
333 akapila@postgresql.o 383 [ + + ]: 825 : if (already_locked)
384 : : {
385 : : #ifdef USE_ASSERT_CHECKING
386 : : LOCKTAG tag;
387 : :
388 [ - + ]: 189 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
389 : : RowExclusiveLock, true));
390 : 189 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
391 [ - + ]: 189 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
392 : : #endif
393 : :
394 : 189 : rel = table_open(SubscriptionRelRelationId, NoLock);
395 : : }
396 : : else
397 : : {
398 : 636 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
399 : 636 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
400 : : }
401 : :
402 : : /* Try finding existing mapping. */
3007 peter_e@gmx.net 403 : 825 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
404 : : ObjectIdGetDatum(relid),
405 : : ObjectIdGetDatum(subid));
406 [ - + ]: 825 : if (!HeapTupleIsValid(tup))
237 akapila@postgresql.o 407 [ # # ]:UNC 0 : elog(ERROR, "subscription relation %u in subscription %u does not exist",
408 : : relid, subid);
409 : :
410 : : /* Update the tuple. */
3007 peter_e@gmx.net 411 :CBC 825 : memset(values, 0, sizeof(values));
412 : 825 : memset(nulls, false, sizeof(nulls));
413 : 825 : memset(replaces, false, sizeof(replaces));
414 : :
415 : 825 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
416 : 825 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
417 : :
418 : 825 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
236 alvherre@kurilemu.de 419 [ + + ]:GNC 825 : if (XLogRecPtrIsValid(sublsn))
3007 peter_e@gmx.net 420 :CBC 404 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
421 : : else
422 : 421 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
423 : :
424 : 825 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
425 : : replaces);
426 : :
427 : : /* Update the catalog. */
428 : 825 : CatalogTupleUpdate(rel, &tup->t_self, tup);
429 : :
430 : : /* Cleanup. */
2717 andres@anarazel.de 431 : 825 : table_close(rel, NoLock);
3386 peter_e@gmx.net 432 : 825 : }
433 : :
434 : : /*
435 : : * Get state of subscription table.
436 : : *
437 : : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
438 : : */
439 : : char
2084 alvherre@alvh.no-ip. 440 : 1283 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
441 : : {
442 : : HeapTuple tup;
443 : : char substate;
444 : : bool isnull;
445 : : Datum d;
446 : : Relation rel;
447 : :
448 : : /*
449 : : * This is to avoid the race condition with AlterSubscription which tries
450 : : * to remove this relstate.
451 : : */
1964 akapila@postgresql.o 452 : 1283 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
453 : :
454 : : /* Try finding the mapping. */
3386 peter_e@gmx.net 455 : 1283 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
456 : : ObjectIdGetDatum(relid),
457 : : ObjectIdGetDatum(subid));
458 : :
459 [ + + ]: 1283 : if (!HeapTupleIsValid(tup))
460 : : {
1951 akapila@postgresql.o 461 : 32 : table_close(rel, AccessShareLock);
2084 alvherre@alvh.no-ip. 462 : 32 : *sublsn = InvalidXLogRecPtr;
463 : 32 : return SUBREL_STATE_UNKNOWN;
464 : : }
465 : :
466 : : /* Get the state. */
467 : 1251 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
468 : :
469 : : /* Get the LSN */
3386 peter_e@gmx.net 470 : 1251 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
471 : : Anum_pg_subscription_rel_srsublsn, &isnull);
472 [ + + ]: 1251 : if (isnull)
473 : 701 : *sublsn = InvalidXLogRecPtr;
474 : : else
475 : 550 : *sublsn = DatumGetLSN(d);
476 : :
477 : : /* Cleanup */
478 : 1251 : ReleaseSysCache(tup);
479 : :
1964 akapila@postgresql.o 480 : 1251 : table_close(rel, AccessShareLock);
481 : :
3386 peter_e@gmx.net 482 : 1251 : return substate;
483 : : }
484 : :
485 : : /*
486 : : * Drop subscription relation mapping. These can be for a particular
487 : : * subscription, or for a particular relation, or both.
488 : : */
489 : : void
490 : 34062 : RemoveSubscriptionRel(Oid subid, Oid relid)
491 : : {
492 : : Relation rel;
493 : : TableScanDesc scan;
494 : : ScanKeyData skey[2];
495 : : HeapTuple tup;
496 : 34062 : int nkeys = 0;
497 : :
2717 andres@anarazel.de 498 : 34062 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
499 : :
3386 peter_e@gmx.net 500 [ + + ]: 34062 : if (OidIsValid(subid))
501 : : {
502 : 171 : ScanKeyInit(&skey[nkeys++],
503 : : Anum_pg_subscription_rel_srsubid,
504 : : BTEqualStrategyNumber,
505 : : F_OIDEQ,
506 : : ObjectIdGetDatum(subid));
507 : : }
508 : :
509 [ + + ]: 34062 : if (OidIsValid(relid))
510 : : {
511 : 33912 : ScanKeyInit(&skey[nkeys++],
512 : : Anum_pg_subscription_rel_srrelid,
513 : : BTEqualStrategyNumber,
514 : : F_OIDEQ,
515 : : ObjectIdGetDatum(relid));
516 : : }
517 : :
518 : : /* Do the search and delete what we found. */
2668 andres@anarazel.de 519 : 34062 : scan = table_beginscan_catalog(rel, nkeys, skey);
3386 peter_e@gmx.net 520 [ + + ]: 34191 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
521 : : {
522 : : Form_pg_subscription_rel subrel;
523 : :
1964 akapila@postgresql.o 524 : 129 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
525 : :
526 : : /*
527 : : * We don't allow to drop the relation mapping when the table
528 : : * synchronization is in progress unless the caller updates the
529 : : * corresponding subscription as well. This is to ensure that we don't
530 : : * leave tablesync slots or origins in the system when the
531 : : * corresponding table is dropped. For sequences, however, it's ok to
532 : : * drop them since no separate slots or origins are created during
533 : : * synchronization.
534 : : */
250 akapila@postgresql.o 535 [ + + ]:GNC 129 : if (!OidIsValid(subid) &&
536 [ - + - - ]: 16 : subrel->srsubstate != SUBREL_STATE_READY &&
250 akapila@postgresql.o 537 :UNC 0 : get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
538 : : {
1964 akapila@postgresql.o 539 [ # # ]:UBC 0 : ereport(ERROR,
540 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
541 : : errmsg("could not drop relation mapping for subscription \"%s\"",
542 : : get_subscription_name(subrel->srsubid, false)),
543 : : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
544 : : get_rel_name(relid), subrel->srsubstate),
545 : :
546 : : /*
547 : : * translator: first %s is a SQL ALTER command and second %s is a
548 : : * SQL DROP command
549 : : */
550 : : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
551 : : "ALTER SUBSCRIPTION ... ENABLE",
552 : : "DROP SUBSCRIPTION ...")));
553 : : }
554 : :
3303 tgl@sss.pgh.pa.us 555 :CBC 129 : CatalogTupleDelete(rel, &tup->t_self);
556 : : }
2668 andres@anarazel.de 557 : 34062 : table_endscan(scan);
558 : :
2717 559 : 34062 : table_close(rel, RowExclusiveLock);
3386 peter_e@gmx.net 560 : 34062 : }
561 : :
562 : : /*
563 : : * Does the subscription have any tables?
564 : : *
565 : : * Use this function only to know true/false, and when you have no need for the
566 : : * List returned by GetSubscriptionRelations.
567 : : */
568 : : bool
257 akapila@postgresql.o 569 :GNC 258 : HasSubscriptionTables(Oid subid)
570 : : {
571 : : Relation rel;
572 : : ScanKeyData skey[1];
573 : : SysScanDesc scan;
574 : : HeapTuple tup;
250 575 : 258 : bool has_subtables = false;
576 : :
1812 akapila@postgresql.o 577 :CBC 258 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
578 : :
579 : 258 : ScanKeyInit(&skey[0],
580 : : Anum_pg_subscription_rel_srsubid,
581 : : BTEqualStrategyNumber, F_OIDEQ,
582 : : ObjectIdGetDatum(subid));
583 : :
584 : 258 : scan = systable_beginscan(rel, InvalidOid, false,
585 : : NULL, 1, skey);
586 : :
250 akapila@postgresql.o 587 [ + + ]:GNC 296 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
588 : : {
589 : : Form_pg_subscription_rel subrel;
590 : : char relkind;
591 : :
592 : 282 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
593 : 282 : relkind = get_rel_relkind(subrel->srrelid);
594 : :
595 [ + + + + ]: 282 : if (relkind == RELKIND_RELATION ||
596 : : relkind == RELKIND_PARTITIONED_TABLE)
597 : : {
598 : 244 : has_subtables = true;
599 : 244 : break;
600 : : }
601 : : }
602 : :
603 : : /* Cleanup */
1812 akapila@postgresql.o 604 :CBC 258 : systable_endscan(scan);
605 : 258 : table_close(rel, AccessShareLock);
606 : :
250 akapila@postgresql.o 607 :GNC 258 : return has_subtables;
608 : : }
609 : :
610 : : /*
611 : : * Get the relations for the subscription.
612 : : *
613 : : * If not_ready is true, return only the relations that are not in a ready
614 : : * state, otherwise return all the relations of the subscription. The
615 : : * returned list is palloc'ed in the current memory context.
616 : : */
617 : : List *
618 : 1175 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
619 : : bool not_ready)
620 : : {
3386 peter_e@gmx.net 621 :CBC 1175 : List *res = NIL;
622 : : Relation rel;
623 : : HeapTuple tup;
624 : 1175 : int nkeys = 0;
625 : : ScanKeyData skey[2];
626 : : SysScanDesc scan;
627 : :
628 : : /* One or both of 'tables' and 'sequences' must be true. */
250 akapila@postgresql.o 629 [ + + - + ]:GNC 1175 : Assert(tables || sequences);
630 : :
2717 andres@anarazel.de 631 :CBC 1175 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
632 : :
3386 peter_e@gmx.net 633 : 1175 : ScanKeyInit(&skey[nkeys++],
634 : : Anum_pg_subscription_rel_srsubid,
635 : : BTEqualStrategyNumber, F_OIDEQ,
636 : : ObjectIdGetDatum(subid));
637 : :
1434 michael@paquier.xyz 638 [ + + ]: 1175 : if (not_ready)
639 : 1134 : ScanKeyInit(&skey[nkeys++],
640 : : Anum_pg_subscription_rel_srsubstate,
641 : : BTEqualStrategyNumber, F_CHARNE,
642 : : CharGetDatum(SUBREL_STATE_READY));
643 : :
3386 peter_e@gmx.net 644 : 1175 : scan = systable_beginscan(rel, InvalidOid, false,
645 : : NULL, nkeys, skey);
646 : :
647 [ + + ]: 3143 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
648 : : {
649 : : Form_pg_subscription_rel subrel;
650 : : SubscriptionRelState *relstate;
651 : : Datum d;
652 : : bool isnull;
653 : : char relkind;
654 : :
655 : 1968 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
656 : :
657 : : /* Relation is either a sequence or a table */
250 akapila@postgresql.o 658 :GNC 1968 : relkind = get_rel_relkind(subrel->srrelid);
659 [ + + + + : 1968 : Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
- + ]
660 : : relkind == RELKIND_PARTITIONED_TABLE);
661 : :
662 : : /* Skip sequences if they were not requested */
663 [ + + - + ]: 1968 : if ((relkind == RELKIND_SEQUENCE) && !sequences)
250 akapila@postgresql.o 664 :UNC 0 : continue;
665 : :
666 : : /* Skip tables if they were not requested */
250 akapila@postgresql.o 667 [ + + + + ]:GNC 1968 : if ((relkind == RELKIND_RELATION ||
668 [ - + ]: 1930 : relkind == RELKIND_PARTITIONED_TABLE) && !tables)
250 akapila@postgresql.o 669 :UNC 0 : continue;
670 : :
202 michael@paquier.xyz 671 :GNC 1968 : relstate = palloc_object(SubscriptionRelState);
3386 peter_e@gmx.net 672 :CBC 1968 : relstate->relid = subrel->srrelid;
673 : 1968 : relstate->state = subrel->srsubstate;
2171 tgl@sss.pgh.pa.us 674 : 1968 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
675 : : Anum_pg_subscription_rel_srsublsn, &isnull);
676 [ + + ]: 1968 : if (isnull)
677 : 1625 : relstate->lsn = InvalidXLogRecPtr;
678 : : else
679 : 343 : relstate->lsn = DatumGetLSN(d);
680 : :
3386 peter_e@gmx.net 681 : 1968 : res = lappend(res, relstate);
682 : : }
683 : :
684 : : /* Cleanup */
685 : 1175 : systable_endscan(scan);
2717 andres@anarazel.de 686 : 1175 : table_close(rel, AccessShareLock);
687 : :
3386 peter_e@gmx.net 688 : 1175 : return res;
689 : : }
690 : :
691 : : /*
692 : : * Update the dead tuple retention status for the given subscription.
693 : : */
694 : : void
301 akapila@postgresql.o 695 :GNC 2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
696 : : {
697 : : Relation rel;
698 : : bool nulls[Natts_pg_subscription];
699 : : bool replaces[Natts_pg_subscription];
700 : : Datum values[Natts_pg_subscription];
701 : : HeapTuple tup;
702 : :
703 : : /* Look up the subscription in the catalog */
704 : 2 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
705 : 2 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
706 : :
707 [ - + ]: 2 : if (!HeapTupleIsValid(tup))
301 akapila@postgresql.o 708 [ # # ]:UNC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
709 : :
301 akapila@postgresql.o 710 :GNC 2 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
711 : :
712 : : /* Form a new tuple. */
713 : 2 : memset(values, 0, sizeof(values));
714 : 2 : memset(nulls, false, sizeof(nulls));
715 : 2 : memset(replaces, false, sizeof(replaces));
716 : :
717 : : /* Set the subscription to disabled. */
71 peter@eisentraut.org 718 : 2 : values[Anum_pg_subscription_subretentionactive - 1] = BoolGetDatum(active);
301 akapila@postgresql.o 719 : 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
720 : :
721 : : /* Update the catalog */
722 : 2 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
723 : : replaces);
724 : 2 : CatalogTupleUpdate(rel, &tup->t_self, tup);
725 : 2 : heap_freetuple(tup);
726 : :
727 : 2 : table_close(rel, NoLock);
728 : 2 : }
|