Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_subscription.c
4 : : * replication subscriptions
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_subscription.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/indexing.h"
22 : : #include "catalog/pg_foreign_server.h"
23 : : #include "catalog/pg_subscription.h"
24 : : #include "catalog/pg_subscription_rel.h"
25 : : #include "catalog/pg_type.h"
26 : : #include "foreign/foreign.h"
27 : : #include "miscadmin.h"
28 : : #include "storage/lmgr.h"
29 : : #include "storage/lock.h"
30 : : #include "utils/acl.h"
31 : : #include "utils/array.h"
32 : : #include "utils/builtins.h"
33 : : #include "utils/fmgroids.h"
34 : : #include "utils/lsyscache.h"
35 : : #include "utils/memutils.h"
36 : : #include "utils/pg_lsn.h"
37 : : #include "utils/rel.h"
38 : : #include "utils/syscache.h"
39 : :
40 : : static List *textarray_to_stringlist(ArrayType *textarray);
41 : :
42 : : /*
43 : : * Add a comma-separated list of publication names to the 'dest' string.
44 : : *
45 : : * If quote_literal is true, the returned list can be used to construct an SQL
46 : : * command, thus no translation is applied. Otherwise, the string can be used
47 : : * to create a user-facing message, so translatable quote marks are added.
48 : : */
49 : : void
50 : 543 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
51 : : {
52 : : ListCell *lc;
53 : 543 : bool first = true;
54 : :
55 : : Assert(publications != NIL);
56 : :
57 [ + - + + : 1394 : foreach(lc, publications)
+ + ]
58 : : {
59 : 851 : char *pubname = strVal(lfirst(lc));
60 : :
61 [ + + ]: 851 : if (quote_literal)
62 : : {
63 [ + + ]: 841 : if (!first)
64 : 307 : appendStringInfoString(dest, ", ");
65 : 841 : appendStringInfoString(dest, quote_literal_cstr(pubname));
66 : : }
67 : : else
68 : : {
69 [ + + ]: 10 : if (first)
70 : 9 : appendStringInfo(dest, _("\"%s\""), pubname);
71 : : else
72 : 1 : appendStringInfo(dest, _(", \"%s\""), pubname);
73 : : }
74 : :
75 : 851 : first = false;
76 : : }
77 : 543 : }
78 : :
79 : : /*
80 : : * Fetch the subscription from the syscache.
81 : : *
82 : : * If conninfo_needed is true, conninfo will be constructed, possibly
83 : : * encountering errors in ForeignServerConnectionString(). Callers not
84 : : * expecting such errors should pass false, in which case conninfo will be
85 : : * NULL.
86 : : */
87 : : Subscription *
88 : 1057 : GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed,
89 : : bool conninfo_aclcheck)
90 : : {
91 : : HeapTuple tup;
92 : : Subscription *sub;
93 : : Form_pg_subscription subform;
94 : : Datum datum;
95 : : bool isnull;
96 : : MemoryContext cxt;
97 : : MemoryContext oldcxt;
98 : :
99 : : Assert(conninfo_needed || !conninfo_aclcheck);
100 : :
101 : 1057 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
102 : :
103 [ + + ]: 1057 : if (!HeapTupleIsValid(tup))
104 : : {
105 [ + - ]: 66 : if (missing_ok)
106 : 66 : return NULL;
107 : :
108 [ # # ]: 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
109 : : }
110 : :
111 : 991 : cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
112 : : ALLOCSET_SMALL_SIZES);
113 : 991 : oldcxt = MemoryContextSwitchTo(cxt);
114 : :
115 : 991 : subform = (Form_pg_subscription) GETSTRUCT(tup);
116 : :
117 : 991 : sub = palloc0_object(Subscription);
118 : 991 : sub->cxt = cxt;
119 : 991 : sub->oid = subid;
120 : 991 : sub->dbid = subform->subdbid;
121 : 991 : sub->skiplsn = subform->subskiplsn;
122 : 991 : sub->name = pstrdup(NameStr(subform->subname));
123 : 991 : sub->owner = subform->subowner;
124 : 991 : sub->enabled = subform->subenabled;
125 : 991 : sub->binary = subform->subbinary;
126 : 991 : sub->stream = subform->substream;
127 : 991 : sub->twophasestate = subform->subtwophasestate;
128 : 991 : sub->disableonerr = subform->subdisableonerr;
129 : 991 : sub->passwordrequired = subform->subpasswordrequired;
130 : 991 : sub->runasowner = subform->subrunasowner;
131 : 991 : sub->failover = subform->subfailover;
132 : 991 : sub->retaindeadtuples = subform->subretaindeadtuples;
133 : 991 : sub->maxretention = subform->submaxretention;
134 : 991 : sub->retentionactive = subform->subretentionactive;
135 : 991 : sub->conflictlogrelid = subform->subconflictlogrelid;
136 : :
137 [ + + ]: 991 : if (conninfo_needed)
138 : : {
139 [ + + ]: 855 : if (OidIsValid(subform->subserver))
140 : : {
141 : : AclResult aclresult;
142 : : ForeignServer *server;
143 : :
144 : 5 : server = GetForeignServer(subform->subserver);
145 : :
146 [ + - ]: 5 : if (conninfo_aclcheck)
147 : : {
148 : : /* recheck ACL if requested */
149 : 5 : aclresult = object_aclcheck(ForeignServerRelationId,
150 : : subform->subserver,
151 : : subform->subowner, ACL_USAGE);
152 : :
153 [ - + ]: 5 : if (aclresult != ACLCHECK_OK)
154 [ # # ]: 0 : ereport(ERROR,
155 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
156 : : errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
157 : : GetUserNameFromId(subform->subowner, false),
158 : : server->servername)));
159 : : }
160 : :
161 : 5 : sub->conninfo = ForeignServerConnectionString(subform->subowner,
162 : : server);
163 : : }
164 : : else
165 : : {
166 : 850 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
167 : : tup,
168 : : Anum_pg_subscription_subconninfo);
169 : 850 : sub->conninfo = TextDatumGetCString(datum);
170 : : }
171 : : }
172 : :
173 : : /* Get slotname */
174 : 991 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
175 : : tup,
176 : : Anum_pg_subscription_subslotname,
177 : : &isnull);
178 [ + + ]: 991 : if (!isnull)
179 : 947 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
180 : : else
181 : 44 : sub->slotname = NULL;
182 : :
183 : : /* Get synccommit */
184 : 991 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
185 : : tup,
186 : : Anum_pg_subscription_subsynccommit);
187 : 991 : sub->synccommit = TextDatumGetCString(datum);
188 : :
189 : : /* Get walrcvtimeout */
190 : 991 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
191 : : tup,
192 : : Anum_pg_subscription_subwalrcvtimeout);
193 : 991 : sub->walrcvtimeout = TextDatumGetCString(datum);
194 : :
195 : : /* Get publications */
196 : 991 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
197 : : tup,
198 : : Anum_pg_subscription_subpublications);
199 : 991 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
200 : :
201 : : /* Get origin */
202 : 991 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
203 : : tup,
204 : : Anum_pg_subscription_suborigin);
205 : 991 : sub->origin = TextDatumGetCString(datum);
206 : :
207 : : /* Get conflict log destination */
208 : 991 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
209 : : tup,
210 : : Anum_pg_subscription_subconflictlogdest);
211 : 991 : sub->conflictlogdest = TextDatumGetCString(datum);
212 : :
213 : : /* Is the subscription owner a superuser? */
214 : 991 : sub->ownersuperuser = superuser_arg(sub->owner);
215 : :
216 : 991 : ReleaseSysCache(tup);
217 : :
218 : 991 : MemoryContextSwitchTo(oldcxt);
219 : :
220 : 991 : return sub;
221 : : }
222 : :
223 : : /*
224 : : * Return number of subscriptions defined in given database.
225 : : * Used by dropdb() to check if database can indeed be dropped.
226 : : */
227 : : int
228 : 50 : CountDBSubscriptions(Oid dbid)
229 : : {
230 : 50 : int nsubs = 0;
231 : : Relation rel;
232 : : ScanKeyData scankey;
233 : : SysScanDesc scan;
234 : : HeapTuple tup;
235 : :
236 : 50 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
237 : :
238 : 50 : ScanKeyInit(&scankey,
239 : : Anum_pg_subscription_subdbid,
240 : : BTEqualStrategyNumber, F_OIDEQ,
241 : : ObjectIdGetDatum(dbid));
242 : :
243 : 50 : scan = systable_beginscan(rel, InvalidOid, false,
244 : : NULL, 1, &scankey);
245 : :
246 [ - + ]: 50 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
247 : 0 : nsubs++;
248 : :
249 : 50 : systable_endscan(scan);
250 : :
251 : 50 : table_close(rel, NoLock);
252 : :
253 : 50 : return nsubs;
254 : : }
255 : :
256 : : /*
257 : : * Disable the given subscription.
258 : : */
259 : : void
260 : 4 : DisableSubscription(Oid subid)
261 : : {
262 : : Relation rel;
263 : : bool nulls[Natts_pg_subscription];
264 : : bool replaces[Natts_pg_subscription];
265 : : Datum values[Natts_pg_subscription];
266 : : HeapTuple tup;
267 : :
268 : : /* Look up the subscription in the catalog */
269 : 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
270 : 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
271 : :
272 [ - + ]: 4 : if (!HeapTupleIsValid(tup))
273 [ # # ]: 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
274 : :
275 : 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
276 : :
277 : : /* Form a new tuple. */
278 : 4 : memset(values, 0, sizeof(values));
279 : 4 : memset(nulls, false, sizeof(nulls));
280 : 4 : memset(replaces, false, sizeof(replaces));
281 : :
282 : : /* Set the subscription to disabled. */
283 : 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
284 : 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
285 : :
286 : : /* Update the catalog */
287 : 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
288 : : replaces);
289 : 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
290 : 4 : heap_freetuple(tup);
291 : :
292 : 4 : table_close(rel, NoLock);
293 : 4 : }
294 : :
295 : : /*
296 : : * Convert text array to list of strings.
297 : : *
298 : : * Note: the resulting list of strings is pallocated here.
299 : : */
300 : : static List *
301 : 991 : textarray_to_stringlist(ArrayType *textarray)
302 : : {
303 : : Datum *elems;
304 : : int nelems,
305 : : i;
306 : 991 : List *res = NIL;
307 : :
308 : 991 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
309 : :
310 [ - + ]: 991 : if (nelems == 0)
311 : 0 : return NIL;
312 : :
313 [ + + ]: 2383 : for (i = 0; i < nelems; i++)
314 : 1392 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
315 : :
316 : 991 : return res;
317 : : }
318 : :
319 : : /*
320 : : * Add new state record for a subscription table.
321 : : *
322 : : * If retain_lock is true, then don't release the locks taken in this function.
323 : : * We normally release the locks at the end of transaction but in binary-upgrade
324 : : * mode, we expect to release those immediately.
325 : : */
326 : : void
327 : 228 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
328 : : XLogRecPtr sublsn, bool retain_lock)
329 : : {
330 : : Relation rel;
331 : : HeapTuple tup;
332 : : bool nulls[Natts_pg_subscription_rel];
333 : : Datum values[Natts_pg_subscription_rel];
334 : :
335 : 228 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
336 : :
337 : 228 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
338 : :
339 : : /* Try finding existing mapping. */
340 : 228 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
341 : : ObjectIdGetDatum(relid),
342 : : ObjectIdGetDatum(subid));
343 [ - + ]: 228 : if (HeapTupleIsValid(tup))
344 [ # # ]: 0 : elog(ERROR, "subscription relation %u in subscription %u already exists",
345 : : relid, subid);
346 : :
347 : : /* Form the tuple. */
348 : 228 : memset(values, 0, sizeof(values));
349 : 228 : memset(nulls, false, sizeof(nulls));
350 : 228 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
351 : 228 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
352 : 228 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
353 [ + + ]: 228 : if (XLogRecPtrIsValid(sublsn))
354 : 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
355 : : else
356 : 226 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
357 : :
358 : 228 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
359 : :
360 : : /* Insert tuple into catalog. */
361 : 228 : CatalogTupleInsert(rel, tup);
362 : :
363 : 228 : heap_freetuple(tup);
364 : :
365 : : /* Cleanup. */
366 [ + + ]: 228 : if (retain_lock)
367 : : {
368 : 225 : table_close(rel, NoLock);
369 : : }
370 : : else
371 : : {
372 : 3 : table_close(rel, RowExclusiveLock);
373 : 3 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
374 : : }
375 : 228 : }
376 : :
377 : : /*
378 : : * Update the state of a subscription table.
379 : : */
380 : : void
381 : 831 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
382 : : XLogRecPtr sublsn, bool already_locked)
383 : : {
384 : : Relation rel;
385 : : HeapTuple tup;
386 : : bool nulls[Natts_pg_subscription_rel];
387 : : Datum values[Natts_pg_subscription_rel];
388 : : bool replaces[Natts_pg_subscription_rel];
389 : :
390 [ + + ]: 831 : if (already_locked)
391 : : {
392 : : #ifdef USE_ASSERT_CHECKING
393 : : LOCKTAG tag;
394 : :
395 : : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
396 : : RowExclusiveLock, true));
397 : : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
398 : : Assert(LockHeldByMe(&tag, AccessShareLock, true));
399 : : #endif
400 : :
401 : 193 : rel = table_open(SubscriptionRelRelationId, NoLock);
402 : : }
403 : : else
404 : : {
405 : 638 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
406 : 638 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
407 : : }
408 : :
409 : : /* Try finding existing mapping. */
410 : 831 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
411 : : ObjectIdGetDatum(relid),
412 : : ObjectIdGetDatum(subid));
413 [ - + ]: 831 : if (!HeapTupleIsValid(tup))
414 [ # # ]: 0 : elog(ERROR, "subscription relation %u in subscription %u does not exist",
415 : : relid, subid);
416 : :
417 : : /* Update the tuple. */
418 : 831 : memset(values, 0, sizeof(values));
419 : 831 : memset(nulls, false, sizeof(nulls));
420 : 831 : memset(replaces, false, sizeof(replaces));
421 : :
422 : 831 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
423 : 831 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
424 : :
425 : 831 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
426 [ + + ]: 831 : if (XLogRecPtrIsValid(sublsn))
427 : 408 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
428 : : else
429 : 423 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
430 : :
431 : 831 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
432 : : replaces);
433 : :
434 : : /* Update the catalog. */
435 : 831 : CatalogTupleUpdate(rel, &tup->t_self, tup);
436 : :
437 : : /* Cleanup. */
438 : 831 : table_close(rel, NoLock);
439 : 831 : }
440 : :
441 : : /*
442 : : * Get state of subscription table.
443 : : *
444 : : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
445 : : */
446 : : char
447 : 1312 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
448 : : {
449 : : HeapTuple tup;
450 : : char substate;
451 : : bool isnull;
452 : : Datum d;
453 : : Relation rel;
454 : :
455 : : /*
456 : : * This is to avoid the race condition with AlterSubscription which tries
457 : : * to remove this relstate.
458 : : */
459 : 1312 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
460 : :
461 : : /* Try finding the mapping. */
462 : 1312 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
463 : : ObjectIdGetDatum(relid),
464 : : ObjectIdGetDatum(subid));
465 : :
466 [ + + ]: 1312 : if (!HeapTupleIsValid(tup))
467 : : {
468 : 35 : table_close(rel, AccessShareLock);
469 : 35 : *sublsn = InvalidXLogRecPtr;
470 : 35 : return SUBREL_STATE_UNKNOWN;
471 : : }
472 : :
473 : : /* Get the state. */
474 : 1277 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
475 : :
476 : : /* Get the LSN */
477 : 1277 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
478 : : Anum_pg_subscription_rel_srsublsn, &isnull);
479 [ + + ]: 1277 : if (isnull)
480 : 706 : *sublsn = InvalidXLogRecPtr;
481 : : else
482 : 571 : *sublsn = DatumGetLSN(d);
483 : :
484 : : /* Cleanup */
485 : 1277 : ReleaseSysCache(tup);
486 : :
487 : 1277 : table_close(rel, AccessShareLock);
488 : :
489 : 1277 : return substate;
490 : : }
491 : :
492 : : /*
493 : : * Drop subscription relation mapping. These can be for a particular
494 : : * subscription, or for a particular relation, or both.
495 : : */
496 : : void
497 : 34165 : RemoveSubscriptionRel(Oid subid, Oid relid)
498 : : {
499 : : Relation rel;
500 : : TableScanDesc scan;
501 : : ScanKeyData skey[2];
502 : : HeapTuple tup;
503 : 34165 : int nkeys = 0;
504 : :
505 : 34165 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
506 : :
507 [ + + ]: 34165 : if (OidIsValid(subid))
508 : : {
509 : 189 : ScanKeyInit(&skey[nkeys++],
510 : : Anum_pg_subscription_rel_srsubid,
511 : : BTEqualStrategyNumber,
512 : : F_OIDEQ,
513 : : ObjectIdGetDatum(subid));
514 : : }
515 : :
516 [ + + ]: 34165 : if (OidIsValid(relid))
517 : : {
518 : 33997 : ScanKeyInit(&skey[nkeys++],
519 : : Anum_pg_subscription_rel_srrelid,
520 : : BTEqualStrategyNumber,
521 : : F_OIDEQ,
522 : : ObjectIdGetDatum(relid));
523 : : }
524 : :
525 : : /* Do the search and delete what we found. */
526 : 34165 : scan = table_beginscan_catalog(rel, nkeys, skey);
527 [ + + ]: 34295 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
528 : : {
529 : : Form_pg_subscription_rel subrel;
530 : :
531 : 130 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
532 : :
533 : : /*
534 : : * We don't allow to drop the relation mapping when the table
535 : : * synchronization is in progress unless the caller updates the
536 : : * corresponding subscription as well. This is to ensure that we don't
537 : : * leave tablesync slots or origins in the system when the
538 : : * corresponding table is dropped. For sequences, however, it's ok to
539 : : * drop them since no separate slots or origins are created during
540 : : * synchronization.
541 : : */
542 [ + + ]: 130 : if (!OidIsValid(subid) &&
543 [ - + - - ]: 16 : subrel->srsubstate != SUBREL_STATE_READY &&
544 : 0 : get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
545 : : {
546 [ # # ]: 0 : ereport(ERROR,
547 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
548 : : errmsg("could not drop relation mapping for subscription \"%s\"",
549 : : get_subscription_name(subrel->srsubid, false)),
550 : : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
551 : : get_rel_name(relid), subrel->srsubstate),
552 : :
553 : : /*
554 : : * translator: first %s is a SQL ALTER command and second %s is a
555 : : * SQL DROP command
556 : : */
557 : : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
558 : : "ALTER SUBSCRIPTION ... ENABLE",
559 : : "DROP SUBSCRIPTION ...")));
560 : : }
561 : :
562 : 130 : CatalogTupleDelete(rel, &tup->t_self);
563 : : }
564 : 34165 : table_endscan(scan);
565 : :
566 : 34165 : table_close(rel, RowExclusiveLock);
567 : 34165 : }
568 : :
569 : : /*
570 : : * Does the subscription have any tables?
571 : : *
572 : : * Use this function only to know true/false, and when you have no need for the
573 : : * List returned by GetSubscriptionRelations.
574 : : */
575 : : bool
576 : 271 : HasSubscriptionTables(Oid subid)
577 : : {
578 : : Relation rel;
579 : : ScanKeyData skey[1];
580 : : SysScanDesc scan;
581 : : HeapTuple tup;
582 : 271 : bool has_subtables = false;
583 : :
584 : 271 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
585 : :
586 : 271 : ScanKeyInit(&skey[0],
587 : : Anum_pg_subscription_rel_srsubid,
588 : : BTEqualStrategyNumber, F_OIDEQ,
589 : : ObjectIdGetDatum(subid));
590 : :
591 : 271 : scan = systable_beginscan(rel, InvalidOid, false,
592 : : NULL, 1, skey);
593 : :
594 [ + + ]: 325 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
595 : : {
596 : : Form_pg_subscription_rel subrel;
597 : : char relkind;
598 : :
599 : 307 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
600 : 307 : relkind = get_rel_relkind(subrel->srrelid);
601 : :
602 [ + + + + ]: 307 : if (relkind == RELKIND_RELATION ||
603 : : relkind == RELKIND_PARTITIONED_TABLE)
604 : : {
605 : 253 : has_subtables = true;
606 : 253 : break;
607 : : }
608 : : }
609 : :
610 : : /* Cleanup */
611 : 271 : systable_endscan(scan);
612 : 271 : table_close(rel, AccessShareLock);
613 : :
614 : 271 : return has_subtables;
615 : : }
616 : :
617 : : /*
618 : : * Get the relations for the subscription.
619 : : *
620 : : * If not_ready is true, return only the relations that are not in a ready
621 : : * state, otherwise return all the relations of the subscription. The
622 : : * returned list is palloc'ed in the current memory context.
623 : : */
624 : : List *
625 : 1229 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
626 : : bool not_ready)
627 : : {
628 : 1229 : List *res = NIL;
629 : : Relation rel;
630 : : HeapTuple tup;
631 : 1229 : int nkeys = 0;
632 : : ScanKeyData skey[2];
633 : : SysScanDesc scan;
634 : :
635 : : /* One or both of 'tables' and 'sequences' must be true. */
636 : : Assert(tables || sequences);
637 : :
638 : 1229 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
639 : :
640 : 1229 : ScanKeyInit(&skey[nkeys++],
641 : : Anum_pg_subscription_rel_srsubid,
642 : : BTEqualStrategyNumber, F_OIDEQ,
643 : : ObjectIdGetDatum(subid));
644 : :
645 [ + + ]: 1229 : if (not_ready)
646 : 1188 : ScanKeyInit(&skey[nkeys++],
647 : : Anum_pg_subscription_rel_srsubstate,
648 : : BTEqualStrategyNumber, F_CHARNE,
649 : : CharGetDatum(SUBREL_STATE_READY));
650 : :
651 : 1229 : scan = systable_beginscan(rel, InvalidOid, false,
652 : : NULL, nkeys, skey);
653 : :
654 [ + + ]: 3227 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
655 : : {
656 : : Form_pg_subscription_rel subrel;
657 : : SubscriptionRelState *relstate;
658 : : Datum d;
659 : : bool isnull;
660 : : char relkind;
661 : :
662 : 1998 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
663 : :
664 : : /* Relation is either a sequence or a table */
665 : 1998 : relkind = get_rel_relkind(subrel->srrelid);
666 : : Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
667 : : relkind == RELKIND_PARTITIONED_TABLE);
668 : :
669 : : /* Skip sequences if they were not requested */
670 [ + + - + ]: 1998 : if ((relkind == RELKIND_SEQUENCE) && !sequences)
671 : 0 : continue;
672 : :
673 : : /* Skip tables if they were not requested */
674 [ + + + + ]: 1998 : if ((relkind == RELKIND_RELATION ||
675 [ - + ]: 1950 : relkind == RELKIND_PARTITIONED_TABLE) && !tables)
676 : 0 : continue;
677 : :
678 : 1998 : relstate = palloc_object(SubscriptionRelState);
679 : 1998 : relstate->relid = subrel->srrelid;
680 : 1998 : relstate->state = subrel->srsubstate;
681 : 1998 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
682 : : Anum_pg_subscription_rel_srsublsn, &isnull);
683 [ + + ]: 1998 : if (isnull)
684 : 1628 : relstate->lsn = InvalidXLogRecPtr;
685 : : else
686 : 370 : relstate->lsn = DatumGetLSN(d);
687 : :
688 : 1998 : res = lappend(res, relstate);
689 : : }
690 : :
691 : : /* Cleanup */
692 : 1229 : systable_endscan(scan);
693 : 1229 : table_close(rel, AccessShareLock);
694 : :
695 : 1229 : return res;
696 : : }
697 : :
698 : : /*
699 : : * Update the dead tuple retention status for the given subscription.
700 : : */
701 : : void
702 : 2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
703 : : {
704 : : Relation rel;
705 : : bool nulls[Natts_pg_subscription];
706 : : bool replaces[Natts_pg_subscription];
707 : : Datum values[Natts_pg_subscription];
708 : : HeapTuple tup;
709 : :
710 : : /* Look up the subscription in the catalog */
711 : 2 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
712 : 2 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
713 : :
714 [ - + ]: 2 : if (!HeapTupleIsValid(tup))
715 [ # # ]: 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
716 : :
717 : 2 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
718 : :
719 : : /* Form a new tuple. */
720 : 2 : memset(values, 0, sizeof(values));
721 : 2 : memset(nulls, false, sizeof(nulls));
722 : 2 : memset(replaces, false, sizeof(replaces));
723 : :
724 : : /* Set the subscription to disabled. */
725 : 2 : values[Anum_pg_subscription_subretentionactive - 1] = BoolGetDatum(active);
726 : 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
727 : :
728 : : /* Update the catalog */
729 : 2 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
730 : : replaces);
731 : 2 : CatalogTupleUpdate(rel, &tup->t_self, tup);
732 : 2 : heap_freetuple(tup);
733 : :
734 : 2 : table_close(rel, NoLock);
735 : 2 : }
|