Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/pg_subscription.h"
23 : #include "catalog/pg_subscription_rel.h"
24 : #include "catalog/pg_type.h"
25 : #include "miscadmin.h"
26 : #include "storage/lmgr.h"
27 : #include "utils/array.h"
28 : #include "utils/builtins.h"
29 : #include "utils/fmgroids.h"
30 : #include "utils/lsyscache.h"
31 : #include "utils/pg_lsn.h"
32 : #include "utils/rel.h"
33 : #include "utils/syscache.h"
34 :
35 : static List *textarray_to_stringlist(ArrayType *textarray);
36 :
37 : /*
38 : * Add a comma-separated list of publication names to the 'dest' string.
39 : */
40 : void
41 1018 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
42 : {
43 : ListCell *lc;
44 1018 : bool first = true;
45 :
46 : Assert(publications != NIL);
47 :
48 2622 : foreach(lc, publications)
49 : {
50 1604 : char *pubname = strVal(lfirst(lc));
51 :
52 1604 : if (first)
53 1018 : first = false;
54 : else
55 586 : appendStringInfoString(dest, ", ");
56 :
57 1604 : if (quote_literal)
58 1584 : appendStringInfoString(dest, quote_literal_cstr(pubname));
59 : else
60 : {
61 20 : appendStringInfoChar(dest, '"');
62 20 : appendStringInfoString(dest, pubname);
63 20 : appendStringInfoChar(dest, '"');
64 : }
65 : }
66 1018 : }
67 :
68 : /*
69 : * Fetch the subscription from the syscache.
70 : */
71 : Subscription *
72 1684 : GetSubscription(Oid subid, bool missing_ok)
73 : {
74 : HeapTuple tup;
75 : Subscription *sub;
76 : Form_pg_subscription subform;
77 : Datum datum;
78 : bool isnull;
79 :
80 1684 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
81 :
82 1684 : if (!HeapTupleIsValid(tup))
83 : {
84 96 : if (missing_ok)
85 96 : return NULL;
86 :
87 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
88 : }
89 :
90 1588 : subform = (Form_pg_subscription) GETSTRUCT(tup);
91 :
92 1588 : sub = (Subscription *) palloc(sizeof(Subscription));
93 1588 : sub->oid = subid;
94 1588 : sub->dbid = subform->subdbid;
95 1588 : sub->skiplsn = subform->subskiplsn;
96 1588 : sub->name = pstrdup(NameStr(subform->subname));
97 1588 : sub->owner = subform->subowner;
98 1588 : sub->enabled = subform->subenabled;
99 1588 : sub->binary = subform->subbinary;
100 1588 : sub->stream = subform->substream;
101 1588 : sub->twophasestate = subform->subtwophasestate;
102 1588 : sub->disableonerr = subform->subdisableonerr;
103 1588 : sub->passwordrequired = subform->subpasswordrequired;
104 1588 : sub->runasowner = subform->subrunasowner;
105 1588 : sub->failover = subform->subfailover;
106 1588 : sub->retaindeadtuples = subform->subretaindeadtuples;
107 1588 : sub->maxretention = subform->submaxretention;
108 1588 : sub->retentionactive = subform->subretentionactive;
109 :
110 : /* Get conninfo */
111 1588 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
112 : tup,
113 : Anum_pg_subscription_subconninfo);
114 1588 : sub->conninfo = TextDatumGetCString(datum);
115 :
116 : /* Get slotname */
117 1588 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
118 : tup,
119 : Anum_pg_subscription_subslotname,
120 : &isnull);
121 1588 : if (!isnull)
122 1522 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
123 : else
124 66 : sub->slotname = NULL;
125 :
126 : /* Get synccommit */
127 1588 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
128 : tup,
129 : Anum_pg_subscription_subsynccommit);
130 1588 : sub->synccommit = TextDatumGetCString(datum);
131 :
132 : /* Get publications */
133 1588 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
134 : tup,
135 : Anum_pg_subscription_subpublications);
136 1588 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
137 :
138 : /* Get origin */
139 1588 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
140 : tup,
141 : Anum_pg_subscription_suborigin);
142 1588 : sub->origin = TextDatumGetCString(datum);
143 :
144 : /* Is the subscription owner a superuser? */
145 1588 : sub->ownersuperuser = superuser_arg(sub->owner);
146 :
147 1588 : ReleaseSysCache(tup);
148 :
149 1588 : return sub;
150 : }
151 :
152 : /*
153 : * Return number of subscriptions defined in given database.
154 : * Used by dropdb() to check if database can indeed be dropped.
155 : */
156 : int
157 90 : CountDBSubscriptions(Oid dbid)
158 : {
159 90 : int nsubs = 0;
160 : Relation rel;
161 : ScanKeyData scankey;
162 : SysScanDesc scan;
163 : HeapTuple tup;
164 :
165 90 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
166 :
167 90 : ScanKeyInit(&scankey,
168 : Anum_pg_subscription_subdbid,
169 : BTEqualStrategyNumber, F_OIDEQ,
170 : ObjectIdGetDatum(dbid));
171 :
172 90 : scan = systable_beginscan(rel, InvalidOid, false,
173 : NULL, 1, &scankey);
174 :
175 90 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
176 0 : nsubs++;
177 :
178 90 : systable_endscan(scan);
179 :
180 90 : table_close(rel, NoLock);
181 :
182 90 : return nsubs;
183 : }
184 :
185 : /*
186 : * Free memory allocated by subscription struct.
187 : */
188 : void
189 82 : FreeSubscription(Subscription *sub)
190 : {
191 82 : pfree(sub->name);
192 82 : pfree(sub->conninfo);
193 82 : if (sub->slotname)
194 82 : pfree(sub->slotname);
195 82 : list_free_deep(sub->publications);
196 82 : pfree(sub);
197 82 : }
198 :
199 : /*
200 : * Disable the given subscription.
201 : */
202 : void
203 8 : DisableSubscription(Oid subid)
204 : {
205 : Relation rel;
206 : bool nulls[Natts_pg_subscription];
207 : bool replaces[Natts_pg_subscription];
208 : Datum values[Natts_pg_subscription];
209 : HeapTuple tup;
210 :
211 : /* Look up the subscription in the catalog */
212 8 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
213 8 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
214 :
215 8 : if (!HeapTupleIsValid(tup))
216 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
217 :
218 8 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
219 :
220 : /* Form a new tuple. */
221 8 : memset(values, 0, sizeof(values));
222 8 : memset(nulls, false, sizeof(nulls));
223 8 : memset(replaces, false, sizeof(replaces));
224 :
225 : /* Set the subscription to disabled. */
226 8 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
227 8 : replaces[Anum_pg_subscription_subenabled - 1] = true;
228 :
229 : /* Update the catalog */
230 8 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
231 : replaces);
232 8 : CatalogTupleUpdate(rel, &tup->t_self, tup);
233 8 : heap_freetuple(tup);
234 :
235 8 : table_close(rel, NoLock);
236 8 : }
237 :
238 : /*
239 : * Convert text array to list of strings.
240 : *
241 : * Note: the resulting list of strings is pallocated here.
242 : */
243 : static List *
244 1588 : textarray_to_stringlist(ArrayType *textarray)
245 : {
246 : Datum *elems;
247 : int nelems,
248 : i;
249 1588 : List *res = NIL;
250 :
251 1588 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
252 :
253 1588 : if (nelems == 0)
254 0 : return NIL;
255 :
256 3894 : for (i = 0; i < nelems; i++)
257 2306 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
258 :
259 1588 : return res;
260 : }
261 :
262 : /*
263 : * Add new state record for a subscription table.
264 : *
265 : * If retain_lock is true, then don't release the locks taken in this function.
266 : * We normally release the locks at the end of transaction but in binary-upgrade
267 : * mode, we expect to release those immediately.
268 : */
269 : void
270 412 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
271 : XLogRecPtr sublsn, bool retain_lock)
272 : {
273 : Relation rel;
274 : HeapTuple tup;
275 : bool nulls[Natts_pg_subscription_rel];
276 : Datum values[Natts_pg_subscription_rel];
277 :
278 412 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
279 :
280 412 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
281 :
282 : /* Try finding existing mapping. */
283 412 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
284 : ObjectIdGetDatum(relid),
285 : ObjectIdGetDatum(subid));
286 412 : if (HeapTupleIsValid(tup))
287 0 : elog(ERROR, "subscription relation %u in subscription %u already exists",
288 : relid, subid);
289 :
290 : /* Form the tuple. */
291 412 : memset(values, 0, sizeof(values));
292 412 : memset(nulls, false, sizeof(nulls));
293 412 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
294 412 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
295 412 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
296 412 : if (sublsn != InvalidXLogRecPtr)
297 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
298 : else
299 410 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
300 :
301 412 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
302 :
303 : /* Insert tuple into catalog. */
304 412 : CatalogTupleInsert(rel, tup);
305 :
306 412 : heap_freetuple(tup);
307 :
308 : /* Cleanup. */
309 412 : if (retain_lock)
310 : {
311 408 : table_close(rel, NoLock);
312 : }
313 : else
314 : {
315 4 : table_close(rel, RowExclusiveLock);
316 4 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
317 : }
318 412 : }
319 :
320 : /*
321 : * Update the state of a subscription table.
322 : */
323 : void
324 1486 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
325 : XLogRecPtr sublsn, bool already_locked)
326 : {
327 : Relation rel;
328 : HeapTuple tup;
329 : bool nulls[Natts_pg_subscription_rel];
330 : Datum values[Natts_pg_subscription_rel];
331 : bool replaces[Natts_pg_subscription_rel];
332 :
333 1486 : if (already_locked)
334 : {
335 : #ifdef USE_ASSERT_CHECKING
336 : LOCKTAG tag;
337 :
338 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
339 : RowExclusiveLock, true));
340 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
341 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
342 : #endif
343 :
344 354 : rel = table_open(SubscriptionRelRelationId, NoLock);
345 : }
346 : else
347 : {
348 1132 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
349 1130 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
350 : }
351 :
352 : /* Try finding existing mapping. */
353 1484 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
354 : ObjectIdGetDatum(relid),
355 : ObjectIdGetDatum(subid));
356 1484 : if (!HeapTupleIsValid(tup))
357 0 : elog(ERROR, "subscription table %u in subscription %u does not exist",
358 : relid, subid);
359 :
360 : /* Update the tuple. */
361 1484 : memset(values, 0, sizeof(values));
362 1484 : memset(nulls, false, sizeof(nulls));
363 1484 : memset(replaces, false, sizeof(replaces));
364 :
365 1484 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
366 1484 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
367 :
368 1484 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
369 1484 : if (sublsn != InvalidXLogRecPtr)
370 722 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
371 : else
372 762 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
373 :
374 1484 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
375 : replaces);
376 :
377 : /* Update the catalog. */
378 1484 : CatalogTupleUpdate(rel, &tup->t_self, tup);
379 :
380 : /* Cleanup. */
381 1484 : table_close(rel, NoLock);
382 1484 : }
383 :
384 : /*
385 : * Get state of subscription table.
386 : *
387 : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
388 : */
389 : char
390 2416 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
391 : {
392 : HeapTuple tup;
393 : char substate;
394 : bool isnull;
395 : Datum d;
396 : Relation rel;
397 :
398 : /*
399 : * This is to avoid the race condition with AlterSubscription which tries
400 : * to remove this relstate.
401 : */
402 2416 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
403 :
404 : /* Try finding the mapping. */
405 2416 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
406 : ObjectIdGetDatum(relid),
407 : ObjectIdGetDatum(subid));
408 :
409 2416 : if (!HeapTupleIsValid(tup))
410 : {
411 44 : table_close(rel, AccessShareLock);
412 44 : *sublsn = InvalidXLogRecPtr;
413 44 : return SUBREL_STATE_UNKNOWN;
414 : }
415 :
416 : /* Get the state. */
417 2372 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
418 :
419 : /* Get the LSN */
420 2372 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
421 : Anum_pg_subscription_rel_srsublsn, &isnull);
422 2372 : if (isnull)
423 1344 : *sublsn = InvalidXLogRecPtr;
424 : else
425 1028 : *sublsn = DatumGetLSN(d);
426 :
427 : /* Cleanup */
428 2372 : ReleaseSysCache(tup);
429 :
430 2372 : table_close(rel, AccessShareLock);
431 :
432 2372 : return substate;
433 : }
434 :
435 : /*
436 : * Drop subscription relation mapping. These can be for a particular
437 : * subscription, or for a particular relation, or both.
438 : */
439 : void
440 49200 : RemoveSubscriptionRel(Oid subid, Oid relid)
441 : {
442 : Relation rel;
443 : TableScanDesc scan;
444 : ScanKeyData skey[2];
445 : HeapTuple tup;
446 49200 : int nkeys = 0;
447 :
448 49200 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
449 :
450 49200 : if (OidIsValid(subid))
451 : {
452 272 : ScanKeyInit(&skey[nkeys++],
453 : Anum_pg_subscription_rel_srsubid,
454 : BTEqualStrategyNumber,
455 : F_OIDEQ,
456 : ObjectIdGetDatum(subid));
457 : }
458 :
459 49200 : if (OidIsValid(relid))
460 : {
461 48970 : ScanKeyInit(&skey[nkeys++],
462 : Anum_pg_subscription_rel_srrelid,
463 : BTEqualStrategyNumber,
464 : F_OIDEQ,
465 : ObjectIdGetDatum(relid));
466 : }
467 :
468 : /* Do the search and delete what we found. */
469 49200 : scan = table_beginscan_catalog(rel, nkeys, skey);
470 49430 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
471 : {
472 : Form_pg_subscription_rel subrel;
473 :
474 230 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
475 :
476 : /*
477 : * We don't allow to drop the relation mapping when the table
478 : * synchronization is in progress unless the caller updates the
479 : * corresponding subscription as well. This is to ensure that we don't
480 : * leave tablesync slots or origins in the system when the
481 : * corresponding table is dropped. For sequences, however, it's ok to
482 : * drop them since no separate slots or origins are created during
483 : * synchronization.
484 : */
485 230 : if (!OidIsValid(subid) &&
486 32 : subrel->srsubstate != SUBREL_STATE_READY &&
487 0 : get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
488 : {
489 0 : ereport(ERROR,
490 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
491 : errmsg("could not drop relation mapping for subscription \"%s\"",
492 : get_subscription_name(subrel->srsubid, false)),
493 : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
494 : get_rel_name(relid), subrel->srsubstate),
495 :
496 : /*
497 : * translator: first %s is a SQL ALTER command and second %s is a
498 : * SQL DROP command
499 : */
500 : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
501 : "ALTER SUBSCRIPTION ... ENABLE",
502 : "DROP SUBSCRIPTION ...")));
503 : }
504 :
505 230 : CatalogTupleDelete(rel, &tup->t_self);
506 : }
507 49200 : table_endscan(scan);
508 :
509 49200 : table_close(rel, RowExclusiveLock);
510 49200 : }
511 :
512 : /*
513 : * Does the subscription have any tables?
514 : *
515 : * Use this function only to know true/false, and when you have no need for the
516 : * List returned by GetSubscriptionRelations.
517 : */
518 : bool
519 502 : HasSubscriptionTables(Oid subid)
520 : {
521 : Relation rel;
522 : ScanKeyData skey[1];
523 : SysScanDesc scan;
524 : HeapTuple tup;
525 502 : bool has_subtables = false;
526 :
527 502 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
528 :
529 502 : ScanKeyInit(&skey[0],
530 : Anum_pg_subscription_rel_srsubid,
531 : BTEqualStrategyNumber, F_OIDEQ,
532 : ObjectIdGetDatum(subid));
533 :
534 502 : scan = systable_beginscan(rel, InvalidOid, false,
535 : NULL, 1, skey);
536 :
537 504 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
538 : {
539 : Form_pg_subscription_rel subrel;
540 : char relkind;
541 :
542 490 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
543 490 : relkind = get_rel_relkind(subrel->srrelid);
544 :
545 490 : if (relkind == RELKIND_RELATION ||
546 : relkind == RELKIND_PARTITIONED_TABLE)
547 : {
548 488 : has_subtables = true;
549 488 : break;
550 : }
551 : }
552 :
553 : /* Cleanup */
554 502 : systable_endscan(scan);
555 502 : table_close(rel, AccessShareLock);
556 :
557 502 : return has_subtables;
558 : }
559 :
560 : /*
561 : * Get the relations for the subscription.
562 : *
563 : * If not_ready is true, return only the relations that are not in a ready
564 : * state, otherwise return all the relations of the subscription. The
565 : * returned list is palloc'ed in the current memory context.
566 : */
567 : List *
568 2044 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
569 : bool not_ready)
570 : {
571 2044 : List *res = NIL;
572 : Relation rel;
573 : HeapTuple tup;
574 2044 : int nkeys = 0;
575 : ScanKeyData skey[2];
576 : SysScanDesc scan;
577 :
578 : /* One or both of 'tables' and 'sequences' must be true. */
579 : Assert(tables || sequences);
580 :
581 2044 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
582 :
583 2044 : ScanKeyInit(&skey[nkeys++],
584 : Anum_pg_subscription_rel_srsubid,
585 : BTEqualStrategyNumber, F_OIDEQ,
586 : ObjectIdGetDatum(subid));
587 :
588 2044 : if (not_ready)
589 1976 : ScanKeyInit(&skey[nkeys++],
590 : Anum_pg_subscription_rel_srsubstate,
591 : BTEqualStrategyNumber, F_CHARNE,
592 : CharGetDatum(SUBREL_STATE_READY));
593 :
594 2044 : scan = systable_beginscan(rel, InvalidOid, false,
595 : NULL, nkeys, skey);
596 :
597 5120 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
598 : {
599 : Form_pg_subscription_rel subrel;
600 : SubscriptionRelState *relstate;
601 : Datum d;
602 : bool isnull;
603 : char relkind;
604 :
605 3076 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
606 :
607 : /* Relation is either a sequence or a table */
608 3076 : relkind = get_rel_relkind(subrel->srrelid);
609 : Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
610 : relkind == RELKIND_PARTITIONED_TABLE);
611 :
612 : /* Skip sequences if they were not requested */
613 3076 : if ((relkind == RELKIND_SEQUENCE) && !sequences)
614 2 : continue;
615 :
616 : /* Skip tables if they were not requested */
617 3074 : if ((relkind == RELKIND_RELATION ||
618 3074 : relkind == RELKIND_PARTITIONED_TABLE) && !tables)
619 0 : continue;
620 :
621 3074 : relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
622 3074 : relstate->relid = subrel->srrelid;
623 3074 : relstate->state = subrel->srsubstate;
624 3074 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
625 : Anum_pg_subscription_rel_srsublsn, &isnull);
626 3074 : if (isnull)
627 2540 : relstate->lsn = InvalidXLogRecPtr;
628 : else
629 534 : relstate->lsn = DatumGetLSN(d);
630 :
631 3074 : res = lappend(res, relstate);
632 : }
633 :
634 : /* Cleanup */
635 2044 : systable_endscan(scan);
636 2044 : table_close(rel, AccessShareLock);
637 :
638 2044 : return res;
639 : }
640 :
641 : /*
642 : * Update the dead tuple retention status for the given subscription.
643 : */
644 : void
645 4 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
646 : {
647 : Relation rel;
648 : bool nulls[Natts_pg_subscription];
649 : bool replaces[Natts_pg_subscription];
650 : Datum values[Natts_pg_subscription];
651 : HeapTuple tup;
652 :
653 : /* Look up the subscription in the catalog */
654 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
655 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
656 :
657 4 : if (!HeapTupleIsValid(tup))
658 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
659 :
660 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
661 :
662 : /* Form a new tuple. */
663 4 : memset(values, 0, sizeof(values));
664 4 : memset(nulls, false, sizeof(nulls));
665 4 : memset(replaces, false, sizeof(replaces));
666 :
667 : /* Set the subscription to disabled. */
668 4 : values[Anum_pg_subscription_subretentionactive - 1] = active;
669 4 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
670 :
671 : /* Update the catalog */
672 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
673 : replaces);
674 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
675 4 : heap_freetuple(tup);
676 :
677 4 : table_close(rel, NoLock);
678 4 : }
|