Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/pg_subscription.h"
23 : #include "catalog/pg_subscription_rel.h"
24 : #include "catalog/pg_type.h"
25 : #include "miscadmin.h"
26 : #include "storage/lmgr.h"
27 : #include "utils/array.h"
28 : #include "utils/builtins.h"
29 : #include "utils/fmgroids.h"
30 : #include "utils/lsyscache.h"
31 : #include "utils/pg_lsn.h"
32 : #include "utils/rel.h"
33 : #include "utils/syscache.h"
34 :
35 : static List *textarray_to_stringlist(ArrayType *textarray);
36 :
37 : /*
38 : * Add a comma-separated list of publication names to the 'dest' string.
39 : */
40 : void
41 534 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
42 : {
43 : ListCell *lc;
44 534 : bool first = true;
45 :
46 : Assert(publications != NIL);
47 :
48 1376 : foreach(lc, publications)
49 : {
50 842 : char *pubname = strVal(lfirst(lc));
51 :
52 842 : if (first)
53 534 : first = false;
54 : else
55 308 : appendStringInfoString(dest, ", ");
56 :
57 842 : if (quote_literal)
58 832 : appendStringInfoString(dest, quote_literal_cstr(pubname));
59 : else
60 : {
61 10 : appendStringInfoChar(dest, '"');
62 10 : appendStringInfoString(dest, pubname);
63 10 : appendStringInfoChar(dest, '"');
64 : }
65 : }
66 534 : }
67 :
68 : /*
69 : * Fetch the subscription from the syscache.
70 : */
71 : Subscription *
72 924 : GetSubscription(Oid subid, bool missing_ok)
73 : {
74 : HeapTuple tup;
75 : Subscription *sub;
76 : Form_pg_subscription subform;
77 : Datum datum;
78 : bool isnull;
79 :
80 924 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
81 :
82 924 : if (!HeapTupleIsValid(tup))
83 : {
84 66 : if (missing_ok)
85 66 : return NULL;
86 :
87 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
88 : }
89 :
90 858 : subform = (Form_pg_subscription) GETSTRUCT(tup);
91 :
92 858 : sub = palloc_object(Subscription);
93 858 : sub->oid = subid;
94 858 : sub->dbid = subform->subdbid;
95 858 : sub->skiplsn = subform->subskiplsn;
96 858 : sub->name = pstrdup(NameStr(subform->subname));
97 858 : sub->owner = subform->subowner;
98 858 : sub->enabled = subform->subenabled;
99 858 : sub->binary = subform->subbinary;
100 858 : sub->stream = subform->substream;
101 858 : sub->twophasestate = subform->subtwophasestate;
102 858 : sub->disableonerr = subform->subdisableonerr;
103 858 : sub->passwordrequired = subform->subpasswordrequired;
104 858 : sub->runasowner = subform->subrunasowner;
105 858 : sub->failover = subform->subfailover;
106 858 : sub->retaindeadtuples = subform->subretaindeadtuples;
107 858 : sub->maxretention = subform->submaxretention;
108 858 : sub->retentionactive = subform->subretentionactive;
109 :
110 : /* Get conninfo */
111 858 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
112 : tup,
113 : Anum_pg_subscription_subconninfo);
114 858 : sub->conninfo = TextDatumGetCString(datum);
115 :
116 : /* Get slotname */
117 858 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
118 : tup,
119 : Anum_pg_subscription_subslotname,
120 : &isnull);
121 858 : if (!isnull)
122 825 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
123 : else
124 33 : sub->slotname = NULL;
125 :
126 : /* Get synccommit */
127 858 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
128 : tup,
129 : Anum_pg_subscription_subsynccommit);
130 858 : sub->synccommit = TextDatumGetCString(datum);
131 :
132 : /* Get walrcvtimeout */
133 858 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
134 : tup,
135 : Anum_pg_subscription_subwalrcvtimeout);
136 858 : sub->walrcvtimeout = TextDatumGetCString(datum);
137 :
138 : /* Get publications */
139 858 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
140 : tup,
141 : Anum_pg_subscription_subpublications);
142 858 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
143 :
144 : /* Get origin */
145 858 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
146 : tup,
147 : Anum_pg_subscription_suborigin);
148 858 : sub->origin = TextDatumGetCString(datum);
149 :
150 : /* Is the subscription owner a superuser? */
151 858 : sub->ownersuperuser = superuser_arg(sub->owner);
152 :
153 858 : ReleaseSysCache(tup);
154 :
155 858 : return sub;
156 : }
157 :
158 : /*
159 : * Return number of subscriptions defined in given database.
160 : * Used by dropdb() to check if database can indeed be dropped.
161 : */
162 : int
163 47 : CountDBSubscriptions(Oid dbid)
164 : {
165 47 : int nsubs = 0;
166 : Relation rel;
167 : ScanKeyData scankey;
168 : SysScanDesc scan;
169 : HeapTuple tup;
170 :
171 47 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
172 :
173 47 : ScanKeyInit(&scankey,
174 : Anum_pg_subscription_subdbid,
175 : BTEqualStrategyNumber, F_OIDEQ,
176 : ObjectIdGetDatum(dbid));
177 :
178 47 : scan = systable_beginscan(rel, InvalidOid, false,
179 : NULL, 1, &scankey);
180 :
181 47 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
182 0 : nsubs++;
183 :
184 47 : systable_endscan(scan);
185 :
186 47 : table_close(rel, NoLock);
187 :
188 47 : return nsubs;
189 : }
190 :
191 : /*
192 : * Free memory allocated by subscription struct.
193 : */
194 : void
195 31 : FreeSubscription(Subscription *sub)
196 : {
197 31 : pfree(sub->name);
198 31 : pfree(sub->conninfo);
199 31 : if (sub->slotname)
200 31 : pfree(sub->slotname);
201 31 : list_free_deep(sub->publications);
202 31 : pfree(sub);
203 31 : }
204 :
205 : /*
206 : * Disable the given subscription.
207 : */
208 : void
209 4 : DisableSubscription(Oid subid)
210 : {
211 : Relation rel;
212 : bool nulls[Natts_pg_subscription];
213 : bool replaces[Natts_pg_subscription];
214 : Datum values[Natts_pg_subscription];
215 : HeapTuple tup;
216 :
217 : /* Look up the subscription in the catalog */
218 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
219 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
220 :
221 4 : if (!HeapTupleIsValid(tup))
222 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
223 :
224 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
225 :
226 : /* Form a new tuple. */
227 4 : memset(values, 0, sizeof(values));
228 4 : memset(nulls, false, sizeof(nulls));
229 4 : memset(replaces, false, sizeof(replaces));
230 :
231 : /* Set the subscription to disabled. */
232 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
233 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
234 :
235 : /* Update the catalog */
236 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
237 : replaces);
238 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
239 4 : heap_freetuple(tup);
240 :
241 4 : table_close(rel, NoLock);
242 4 : }
243 :
244 : /*
245 : * Convert text array to list of strings.
246 : *
247 : * Note: the resulting list of strings is pallocated here.
248 : */
249 : static List *
250 858 : textarray_to_stringlist(ArrayType *textarray)
251 : {
252 : Datum *elems;
253 : int nelems,
254 : i;
255 858 : List *res = NIL;
256 :
257 858 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
258 :
259 858 : if (nelems == 0)
260 0 : return NIL;
261 :
262 2110 : for (i = 0; i < nelems; i++)
263 1252 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
264 :
265 858 : return res;
266 : }
267 :
268 : /*
269 : * Add new state record for a subscription table.
270 : *
271 : * If retain_lock is true, then don't release the locks taken in this function.
272 : * We normally release the locks at the end of transaction but in binary-upgrade
273 : * mode, we expect to release those immediately.
274 : */
275 : void
276 222 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
277 : XLogRecPtr sublsn, bool retain_lock)
278 : {
279 : Relation rel;
280 : HeapTuple tup;
281 : bool nulls[Natts_pg_subscription_rel];
282 : Datum values[Natts_pg_subscription_rel];
283 :
284 222 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
285 :
286 222 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
287 :
288 : /* Try finding existing mapping. */
289 222 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
290 : ObjectIdGetDatum(relid),
291 : ObjectIdGetDatum(subid));
292 222 : if (HeapTupleIsValid(tup))
293 0 : elog(ERROR, "subscription relation %u in subscription %u already exists",
294 : relid, subid);
295 :
296 : /* Form the tuple. */
297 222 : memset(values, 0, sizeof(values));
298 222 : memset(nulls, false, sizeof(nulls));
299 222 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
300 222 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
301 222 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
302 222 : if (XLogRecPtrIsValid(sublsn))
303 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
304 : else
305 220 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
306 :
307 222 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
308 :
309 : /* Insert tuple into catalog. */
310 222 : CatalogTupleInsert(rel, tup);
311 :
312 222 : heap_freetuple(tup);
313 :
314 : /* Cleanup. */
315 222 : if (retain_lock)
316 : {
317 219 : table_close(rel, NoLock);
318 : }
319 : else
320 : {
321 3 : table_close(rel, RowExclusiveLock);
322 3 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
323 : }
324 222 : }
325 :
326 : /*
327 : * Update the state of a subscription table.
328 : */
329 : void
330 786 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
331 : XLogRecPtr sublsn, bool already_locked)
332 : {
333 : Relation rel;
334 : HeapTuple tup;
335 : bool nulls[Natts_pg_subscription_rel];
336 : Datum values[Natts_pg_subscription_rel];
337 : bool replaces[Natts_pg_subscription_rel];
338 :
339 786 : if (already_locked)
340 : {
341 : #ifdef USE_ASSERT_CHECKING
342 : LOCKTAG tag;
343 :
344 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
345 : RowExclusiveLock, true));
346 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
347 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
348 : #endif
349 :
350 184 : rel = table_open(SubscriptionRelRelationId, NoLock);
351 : }
352 : else
353 : {
354 602 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
355 602 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
356 : }
357 :
358 : /* Try finding existing mapping. */
359 786 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
360 : ObjectIdGetDatum(relid),
361 : ObjectIdGetDatum(subid));
362 786 : if (!HeapTupleIsValid(tup))
363 0 : elog(ERROR, "subscription relation %u in subscription %u does not exist",
364 : relid, subid);
365 :
366 : /* Update the tuple. */
367 786 : memset(values, 0, sizeof(values));
368 786 : memset(nulls, false, sizeof(nulls));
369 786 : memset(replaces, false, sizeof(replaces));
370 :
371 786 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
372 786 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
373 :
374 786 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
375 786 : if (XLogRecPtrIsValid(sublsn))
376 384 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
377 : else
378 402 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
379 :
380 786 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
381 : replaces);
382 :
383 : /* Update the catalog. */
384 786 : CatalogTupleUpdate(rel, &tup->t_self, tup);
385 :
386 : /* Cleanup. */
387 786 : table_close(rel, NoLock);
388 786 : }
389 :
390 : /*
391 : * Get state of subscription table.
392 : *
393 : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
394 : */
395 : char
396 4247 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
397 : {
398 : HeapTuple tup;
399 : char substate;
400 : bool isnull;
401 : Datum d;
402 : Relation rel;
403 :
404 : /*
405 : * This is to avoid the race condition with AlterSubscription which tries
406 : * to remove this relstate.
407 : */
408 4247 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
409 :
410 : /* Try finding the mapping. */
411 4247 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
412 : ObjectIdGetDatum(relid),
413 : ObjectIdGetDatum(subid));
414 :
415 4247 : if (!HeapTupleIsValid(tup))
416 : {
417 25 : table_close(rel, AccessShareLock);
418 25 : *sublsn = InvalidXLogRecPtr;
419 25 : return SUBREL_STATE_UNKNOWN;
420 : }
421 :
422 : /* Get the state. */
423 4222 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
424 :
425 : /* Get the LSN */
426 4222 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
427 : Anum_pg_subscription_rel_srsublsn, &isnull);
428 4222 : if (isnull)
429 3649 : *sublsn = InvalidXLogRecPtr;
430 : else
431 573 : *sublsn = DatumGetLSN(d);
432 :
433 : /* Cleanup */
434 4222 : ReleaseSysCache(tup);
435 :
436 4222 : table_close(rel, AccessShareLock);
437 :
438 4222 : return substate;
439 : }
440 :
441 : /*
442 : * Drop subscription relation mapping. These can be for a particular
443 : * subscription, or for a particular relation, or both.
444 : */
445 : void
446 26130 : RemoveSubscriptionRel(Oid subid, Oid relid)
447 : {
448 : Relation rel;
449 : TableScanDesc scan;
450 : ScanKeyData skey[2];
451 : HeapTuple tup;
452 26130 : int nkeys = 0;
453 :
454 26130 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
455 :
456 26130 : if (OidIsValid(subid))
457 : {
458 142 : ScanKeyInit(&skey[nkeys++],
459 : Anum_pg_subscription_rel_srsubid,
460 : BTEqualStrategyNumber,
461 : F_OIDEQ,
462 : ObjectIdGetDatum(subid));
463 : }
464 :
465 26130 : if (OidIsValid(relid))
466 : {
467 26009 : ScanKeyInit(&skey[nkeys++],
468 : Anum_pg_subscription_rel_srrelid,
469 : BTEqualStrategyNumber,
470 : F_OIDEQ,
471 : ObjectIdGetDatum(relid));
472 : }
473 :
474 : /* Do the search and delete what we found. */
475 26130 : scan = table_beginscan_catalog(rel, nkeys, skey);
476 26256 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
477 : {
478 : Form_pg_subscription_rel subrel;
479 :
480 126 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
481 :
482 : /*
483 : * We don't allow to drop the relation mapping when the table
484 : * synchronization is in progress unless the caller updates the
485 : * corresponding subscription as well. This is to ensure that we don't
486 : * leave tablesync slots or origins in the system when the
487 : * corresponding table is dropped. For sequences, however, it's ok to
488 : * drop them since no separate slots or origins are created during
489 : * synchronization.
490 : */
491 126 : if (!OidIsValid(subid) &&
492 16 : subrel->srsubstate != SUBREL_STATE_READY &&
493 0 : get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
494 : {
495 0 : ereport(ERROR,
496 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
497 : errmsg("could not drop relation mapping for subscription \"%s\"",
498 : get_subscription_name(subrel->srsubid, false)),
499 : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
500 : get_rel_name(relid), subrel->srsubstate),
501 :
502 : /*
503 : * translator: first %s is a SQL ALTER command and second %s is a
504 : * SQL DROP command
505 : */
506 : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
507 : "ALTER SUBSCRIPTION ... ENABLE",
508 : "DROP SUBSCRIPTION ...")));
509 : }
510 :
511 126 : CatalogTupleDelete(rel, &tup->t_self);
512 : }
513 26130 : table_endscan(scan);
514 :
515 26130 : table_close(rel, RowExclusiveLock);
516 26130 : }
517 :
518 : /*
519 : * Does the subscription have any tables?
520 : *
521 : * Use this function only to know true/false, and when you have no need for the
522 : * List returned by GetSubscriptionRelations.
523 : */
524 : bool
525 274 : HasSubscriptionTables(Oid subid)
526 : {
527 : Relation rel;
528 : ScanKeyData skey[1];
529 : SysScanDesc scan;
530 : HeapTuple tup;
531 274 : bool has_subtables = false;
532 :
533 274 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
534 :
535 274 : ScanKeyInit(&skey[0],
536 : Anum_pg_subscription_rel_srsubid,
537 : BTEqualStrategyNumber, F_OIDEQ,
538 : ObjectIdGetDatum(subid));
539 :
540 274 : scan = systable_beginscan(rel, InvalidOid, false,
541 : NULL, 1, skey);
542 :
543 311 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
544 : {
545 : Form_pg_subscription_rel subrel;
546 : char relkind;
547 :
548 296 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
549 296 : relkind = get_rel_relkind(subrel->srrelid);
550 :
551 296 : if (relkind == RELKIND_RELATION ||
552 : relkind == RELKIND_PARTITIONED_TABLE)
553 : {
554 259 : has_subtables = true;
555 259 : break;
556 : }
557 : }
558 :
559 : /* Cleanup */
560 274 : systable_endscan(scan);
561 274 : table_close(rel, AccessShareLock);
562 :
563 274 : return has_subtables;
564 : }
565 :
566 : /*
567 : * Get the relations for the subscription.
568 : *
569 : * If not_ready is true, return only the relations that are not in a ready
570 : * state, otherwise return all the relations of the subscription. The
571 : * returned list is palloc'ed in the current memory context.
572 : */
573 : List *
574 1121 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
575 : bool not_ready)
576 : {
577 1121 : List *res = NIL;
578 : Relation rel;
579 : HeapTuple tup;
580 1121 : int nkeys = 0;
581 : ScanKeyData skey[2];
582 : SysScanDesc scan;
583 :
584 : /* One or both of 'tables' and 'sequences' must be true. */
585 : Assert(tables || sequences);
586 :
587 1121 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
588 :
589 1121 : ScanKeyInit(&skey[nkeys++],
590 : Anum_pg_subscription_rel_srsubid,
591 : BTEqualStrategyNumber, F_OIDEQ,
592 : ObjectIdGetDatum(subid));
593 :
594 1121 : if (not_ready)
595 1083 : ScanKeyInit(&skey[nkeys++],
596 : Anum_pg_subscription_rel_srsubstate,
597 : BTEqualStrategyNumber, F_CHARNE,
598 : CharGetDatum(SUBREL_STATE_READY));
599 :
600 1121 : scan = systable_beginscan(rel, InvalidOid, false,
601 : NULL, nkeys, skey);
602 :
603 2945 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
604 : {
605 : Form_pg_subscription_rel subrel;
606 : SubscriptionRelState *relstate;
607 : Datum d;
608 : bool isnull;
609 : char relkind;
610 :
611 1824 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
612 :
613 : /* Relation is either a sequence or a table */
614 1824 : relkind = get_rel_relkind(subrel->srrelid);
615 : Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
616 : relkind == RELKIND_PARTITIONED_TABLE);
617 :
618 : /* Skip sequences if they were not requested */
619 1824 : if ((relkind == RELKIND_SEQUENCE) && !sequences)
620 0 : continue;
621 :
622 : /* Skip tables if they were not requested */
623 1824 : if ((relkind == RELKIND_RELATION ||
624 1797 : relkind == RELKIND_PARTITIONED_TABLE) && !tables)
625 0 : continue;
626 :
627 1824 : relstate = palloc_object(SubscriptionRelState);
628 1824 : relstate->relid = subrel->srrelid;
629 1824 : relstate->state = subrel->srsubstate;
630 1824 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
631 : Anum_pg_subscription_rel_srsublsn, &isnull);
632 1824 : if (isnull)
633 1495 : relstate->lsn = InvalidXLogRecPtr;
634 : else
635 329 : relstate->lsn = DatumGetLSN(d);
636 :
637 1824 : res = lappend(res, relstate);
638 : }
639 :
640 : /* Cleanup */
641 1121 : systable_endscan(scan);
642 1121 : table_close(rel, AccessShareLock);
643 :
644 1121 : return res;
645 : }
646 :
647 : /*
648 : * Update the dead tuple retention status for the given subscription.
649 : */
650 : void
651 2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
652 : {
653 : Relation rel;
654 : bool nulls[Natts_pg_subscription];
655 : bool replaces[Natts_pg_subscription];
656 : Datum values[Natts_pg_subscription];
657 : HeapTuple tup;
658 :
659 : /* Look up the subscription in the catalog */
660 2 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
661 2 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
662 :
663 2 : if (!HeapTupleIsValid(tup))
664 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
665 :
666 2 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
667 :
668 : /* Form a new tuple. */
669 2 : memset(values, 0, sizeof(values));
670 2 : memset(nulls, false, sizeof(nulls));
671 2 : memset(replaces, false, sizeof(replaces));
672 :
673 : /* Set the subscription to disabled. */
674 2 : values[Anum_pg_subscription_subretentionactive - 1] = active;
675 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
676 :
677 : /* Update the catalog */
678 2 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
679 : replaces);
680 2 : CatalogTupleUpdate(rel, &tup->t_self, tup);
681 2 : heap_freetuple(tup);
682 :
683 2 : table_close(rel, NoLock);
684 2 : }
|