Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/pg_foreign_server.h"
23 : #include "catalog/pg_subscription.h"
24 : #include "catalog/pg_subscription_rel.h"
25 : #include "catalog/pg_type.h"
26 : #include "foreign/foreign.h"
27 : #include "miscadmin.h"
28 : #include "storage/lmgr.h"
29 : #include "storage/lock.h"
30 : #include "utils/acl.h"
31 : #include "utils/array.h"
32 : #include "utils/builtins.h"
33 : #include "utils/fmgroids.h"
34 : #include "utils/lsyscache.h"
35 : #include "utils/memutils.h"
36 : #include "utils/pg_lsn.h"
37 : #include "utils/rel.h"
38 : #include "utils/syscache.h"
39 :
40 : static List *textarray_to_stringlist(ArrayType *textarray);
41 :
42 : /*
43 : * Add a comma-separated list of publication names to the 'dest' string.
44 : */
45 : void
46 543 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
47 : {
48 : ListCell *lc;
49 543 : bool first = true;
50 :
51 : Assert(publications != NIL);
52 :
53 1395 : foreach(lc, publications)
54 : {
55 852 : char *pubname = strVal(lfirst(lc));
56 :
57 852 : if (first)
58 543 : first = false;
59 : else
60 309 : appendStringInfoString(dest, ", ");
61 :
62 852 : if (quote_literal)
63 842 : appendStringInfoString(dest, quote_literal_cstr(pubname));
64 : else
65 : {
66 10 : appendStringInfoChar(dest, '"');
67 10 : appendStringInfoString(dest, pubname);
68 10 : appendStringInfoChar(dest, '"');
69 : }
70 : }
71 543 : }
72 :
73 : /*
74 : * Fetch the subscription from the syscache.
75 : */
76 : Subscription *
77 1032 : GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
78 : {
79 : HeapTuple tup;
80 : Subscription *sub;
81 : Form_pg_subscription subform;
82 : Datum datum;
83 : bool isnull;
84 : MemoryContext cxt;
85 : MemoryContext oldcxt;
86 :
87 1032 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
88 :
89 1032 : if (!HeapTupleIsValid(tup))
90 : {
91 71 : if (missing_ok)
92 71 : return NULL;
93 :
94 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
95 : }
96 :
97 961 : cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
98 : ALLOCSET_SMALL_SIZES);
99 961 : oldcxt = MemoryContextSwitchTo(cxt);
100 :
101 961 : subform = (Form_pg_subscription) GETSTRUCT(tup);
102 :
103 961 : sub = palloc_object(Subscription);
104 961 : sub->cxt = cxt;
105 961 : sub->oid = subid;
106 961 : sub->dbid = subform->subdbid;
107 961 : sub->skiplsn = subform->subskiplsn;
108 961 : sub->name = pstrdup(NameStr(subform->subname));
109 961 : sub->owner = subform->subowner;
110 961 : sub->enabled = subform->subenabled;
111 961 : sub->binary = subform->subbinary;
112 961 : sub->stream = subform->substream;
113 961 : sub->twophasestate = subform->subtwophasestate;
114 961 : sub->disableonerr = subform->subdisableonerr;
115 961 : sub->passwordrequired = subform->subpasswordrequired;
116 961 : sub->runasowner = subform->subrunasowner;
117 961 : sub->failover = subform->subfailover;
118 961 : sub->retaindeadtuples = subform->subretaindeadtuples;
119 961 : sub->maxretention = subform->submaxretention;
120 961 : sub->retentionactive = subform->subretentionactive;
121 :
122 : /* Get conninfo */
123 961 : if (OidIsValid(subform->subserver))
124 : {
125 : AclResult aclresult;
126 : ForeignServer *server;
127 :
128 8 : server = GetForeignServer(subform->subserver);
129 :
130 : /* recheck ACL if requested */
131 8 : if (aclcheck)
132 : {
133 3 : aclresult = object_aclcheck(ForeignServerRelationId,
134 : subform->subserver,
135 : subform->subowner, ACL_USAGE);
136 :
137 3 : if (aclresult != ACLCHECK_OK)
138 0 : ereport(ERROR,
139 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
140 : errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
141 : GetUserNameFromId(subform->subowner, false),
142 : server->servername)));
143 : }
144 :
145 8 : sub->conninfo = ForeignServerConnectionString(subform->subowner,
146 : server);
147 : }
148 : else
149 : {
150 953 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
151 : tup,
152 : Anum_pg_subscription_subconninfo);
153 953 : sub->conninfo = TextDatumGetCString(datum);
154 : }
155 :
156 : /* Get slotname */
157 961 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
158 : tup,
159 : Anum_pg_subscription_subslotname,
160 : &isnull);
161 961 : if (!isnull)
162 917 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
163 : else
164 44 : sub->slotname = NULL;
165 :
166 : /* Get synccommit */
167 961 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
168 : tup,
169 : Anum_pg_subscription_subsynccommit);
170 961 : sub->synccommit = TextDatumGetCString(datum);
171 :
172 : /* Get walrcvtimeout */
173 961 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
174 : tup,
175 : Anum_pg_subscription_subwalrcvtimeout);
176 961 : sub->walrcvtimeout = TextDatumGetCString(datum);
177 :
178 : /* Get publications */
179 961 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
180 : tup,
181 : Anum_pg_subscription_subpublications);
182 961 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
183 :
184 : /* Get origin */
185 961 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
186 : tup,
187 : Anum_pg_subscription_suborigin);
188 961 : sub->origin = TextDatumGetCString(datum);
189 :
190 : /* Is the subscription owner a superuser? */
191 961 : sub->ownersuperuser = superuser_arg(sub->owner);
192 :
193 961 : ReleaseSysCache(tup);
194 :
195 961 : MemoryContextSwitchTo(oldcxt);
196 :
197 961 : return sub;
198 : }
199 :
200 : /*
201 : * Return number of subscriptions defined in given database.
202 : * Used by dropdb() to check if database can indeed be dropped.
203 : */
204 : int
205 51 : CountDBSubscriptions(Oid dbid)
206 : {
207 51 : int nsubs = 0;
208 : Relation rel;
209 : ScanKeyData scankey;
210 : SysScanDesc scan;
211 : HeapTuple tup;
212 :
213 51 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
214 :
215 51 : ScanKeyInit(&scankey,
216 : Anum_pg_subscription_subdbid,
217 : BTEqualStrategyNumber, F_OIDEQ,
218 : ObjectIdGetDatum(dbid));
219 :
220 51 : scan = systable_beginscan(rel, InvalidOid, false,
221 : NULL, 1, &scankey);
222 :
223 51 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
224 0 : nsubs++;
225 :
226 51 : systable_endscan(scan);
227 :
228 51 : table_close(rel, NoLock);
229 :
230 51 : return nsubs;
231 : }
232 :
233 : /*
234 : * Disable the given subscription.
235 : */
236 : void
237 4 : DisableSubscription(Oid subid)
238 : {
239 : Relation rel;
240 : bool nulls[Natts_pg_subscription];
241 : bool replaces[Natts_pg_subscription];
242 : Datum values[Natts_pg_subscription];
243 : HeapTuple tup;
244 :
245 : /* Look up the subscription in the catalog */
246 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
247 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
248 :
249 4 : if (!HeapTupleIsValid(tup))
250 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
251 :
252 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
253 :
254 : /* Form a new tuple. */
255 4 : memset(values, 0, sizeof(values));
256 4 : memset(nulls, false, sizeof(nulls));
257 4 : memset(replaces, false, sizeof(replaces));
258 :
259 : /* Set the subscription to disabled. */
260 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
261 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
262 :
263 : /* Update the catalog */
264 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
265 : replaces);
266 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
267 4 : heap_freetuple(tup);
268 :
269 4 : table_close(rel, NoLock);
270 4 : }
271 :
272 : /*
273 : * Convert text array to list of strings.
274 : *
275 : * Note: the resulting list of strings is pallocated here.
276 : */
277 : static List *
278 961 : textarray_to_stringlist(ArrayType *textarray)
279 : {
280 : Datum *elems;
281 : int nelems,
282 : i;
283 961 : List *res = NIL;
284 :
285 961 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
286 :
287 961 : if (nelems == 0)
288 0 : return NIL;
289 :
290 2348 : for (i = 0; i < nelems; i++)
291 1387 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
292 :
293 961 : return res;
294 : }
295 :
296 : /*
297 : * Add new state record for a subscription table.
298 : *
299 : * If retain_lock is true, then don't release the locks taken in this function.
300 : * We normally release the locks at the end of transaction but in binary-upgrade
301 : * mode, we expect to release those immediately.
302 : */
303 : void
304 227 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
305 : XLogRecPtr sublsn, bool retain_lock)
306 : {
307 : Relation rel;
308 : HeapTuple tup;
309 : bool nulls[Natts_pg_subscription_rel];
310 : Datum values[Natts_pg_subscription_rel];
311 :
312 227 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
313 :
314 227 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
315 :
316 : /* Try finding existing mapping. */
317 227 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
318 : ObjectIdGetDatum(relid),
319 : ObjectIdGetDatum(subid));
320 227 : if (HeapTupleIsValid(tup))
321 0 : elog(ERROR, "subscription relation %u in subscription %u already exists",
322 : relid, subid);
323 :
324 : /* Form the tuple. */
325 227 : memset(values, 0, sizeof(values));
326 227 : memset(nulls, false, sizeof(nulls));
327 227 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
328 227 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
329 227 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
330 227 : if (XLogRecPtrIsValid(sublsn))
331 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
332 : else
333 225 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
334 :
335 227 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
336 :
337 : /* Insert tuple into catalog. */
338 227 : CatalogTupleInsert(rel, tup);
339 :
340 227 : heap_freetuple(tup);
341 :
342 : /* Cleanup. */
343 227 : if (retain_lock)
344 : {
345 224 : table_close(rel, NoLock);
346 : }
347 : else
348 : {
349 3 : table_close(rel, RowExclusiveLock);
350 3 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
351 : }
352 227 : }
353 :
354 : /*
355 : * Update the state of a subscription table.
356 : */
357 : void
358 818 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
359 : XLogRecPtr sublsn, bool already_locked)
360 : {
361 : Relation rel;
362 : HeapTuple tup;
363 : bool nulls[Natts_pg_subscription_rel];
364 : Datum values[Natts_pg_subscription_rel];
365 : bool replaces[Natts_pg_subscription_rel];
366 :
367 818 : if (already_locked)
368 : {
369 : #ifdef USE_ASSERT_CHECKING
370 : LOCKTAG tag;
371 :
372 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
373 : RowExclusiveLock, true));
374 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
375 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
376 : #endif
377 :
378 196 : rel = table_open(SubscriptionRelRelationId, NoLock);
379 : }
380 : else
381 : {
382 622 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
383 622 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
384 : }
385 :
386 : /* Try finding existing mapping. */
387 818 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
388 : ObjectIdGetDatum(relid),
389 : ObjectIdGetDatum(subid));
390 818 : if (!HeapTupleIsValid(tup))
391 0 : elog(ERROR, "subscription relation %u in subscription %u does not exist",
392 : relid, subid);
393 :
394 : /* Update the tuple. */
395 818 : memset(values, 0, sizeof(values));
396 818 : memset(nulls, false, sizeof(nulls));
397 818 : memset(replaces, false, sizeof(replaces));
398 :
399 818 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
400 818 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
401 :
402 818 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
403 818 : if (XLogRecPtrIsValid(sublsn))
404 403 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
405 : else
406 415 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
407 :
408 818 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
409 : replaces);
410 :
411 : /* Update the catalog. */
412 818 : CatalogTupleUpdate(rel, &tup->t_self, tup);
413 :
414 : /* Cleanup. */
415 818 : table_close(rel, NoLock);
416 818 : }
417 :
418 : /*
419 : * Get state of subscription table.
420 : *
421 : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
422 : */
423 : char
424 1305 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
425 : {
426 : HeapTuple tup;
427 : char substate;
428 : bool isnull;
429 : Datum d;
430 : Relation rel;
431 :
432 : /*
433 : * This is to avoid the race condition with AlterSubscription which tries
434 : * to remove this relstate.
435 : */
436 1305 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
437 :
438 : /* Try finding the mapping. */
439 1305 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
440 : ObjectIdGetDatum(relid),
441 : ObjectIdGetDatum(subid));
442 :
443 1305 : if (!HeapTupleIsValid(tup))
444 : {
445 35 : table_close(rel, AccessShareLock);
446 35 : *sublsn = InvalidXLogRecPtr;
447 35 : return SUBREL_STATE_UNKNOWN;
448 : }
449 :
450 : /* Get the state. */
451 1270 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
452 :
453 : /* Get the LSN */
454 1270 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
455 : Anum_pg_subscription_rel_srsublsn, &isnull);
456 1270 : if (isnull)
457 670 : *sublsn = InvalidXLogRecPtr;
458 : else
459 600 : *sublsn = DatumGetLSN(d);
460 :
461 : /* Cleanup */
462 1270 : ReleaseSysCache(tup);
463 :
464 1270 : table_close(rel, AccessShareLock);
465 :
466 1270 : return substate;
467 : }
468 :
469 : /*
470 : * Drop subscription relation mapping. These can be for a particular
471 : * subscription, or for a particular relation, or both.
472 : */
473 : void
474 32373 : RemoveSubscriptionRel(Oid subid, Oid relid)
475 : {
476 : Relation rel;
477 : TableScanDesc scan;
478 : ScanKeyData skey[2];
479 : HeapTuple tup;
480 32373 : int nkeys = 0;
481 :
482 32373 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
483 :
484 32373 : if (OidIsValid(subid))
485 : {
486 164 : ScanKeyInit(&skey[nkeys++],
487 : Anum_pg_subscription_rel_srsubid,
488 : BTEqualStrategyNumber,
489 : F_OIDEQ,
490 : ObjectIdGetDatum(subid));
491 : }
492 :
493 32373 : if (OidIsValid(relid))
494 : {
495 32230 : ScanKeyInit(&skey[nkeys++],
496 : Anum_pg_subscription_rel_srrelid,
497 : BTEqualStrategyNumber,
498 : F_OIDEQ,
499 : ObjectIdGetDatum(relid));
500 : }
501 :
502 : /* Do the search and delete what we found. */
503 32373 : scan = table_beginscan_catalog(rel, nkeys, skey);
504 32503 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
505 : {
506 : Form_pg_subscription_rel subrel;
507 :
508 130 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
509 :
510 : /*
511 : * We don't allow to drop the relation mapping when the table
512 : * synchronization is in progress unless the caller updates the
513 : * corresponding subscription as well. This is to ensure that we don't
514 : * leave tablesync slots or origins in the system when the
515 : * corresponding table is dropped. For sequences, however, it's ok to
516 : * drop them since no separate slots or origins are created during
517 : * synchronization.
518 : */
519 130 : if (!OidIsValid(subid) &&
520 16 : subrel->srsubstate != SUBREL_STATE_READY &&
521 0 : get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
522 : {
523 0 : ereport(ERROR,
524 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
525 : errmsg("could not drop relation mapping for subscription \"%s\"",
526 : get_subscription_name(subrel->srsubid, false)),
527 : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
528 : get_rel_name(relid), subrel->srsubstate),
529 :
530 : /*
531 : * translator: first %s is a SQL ALTER command and second %s is a
532 : * SQL DROP command
533 : */
534 : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
535 : "ALTER SUBSCRIPTION ... ENABLE",
536 : "DROP SUBSCRIPTION ...")));
537 : }
538 :
539 130 : CatalogTupleDelete(rel, &tup->t_self);
540 : }
541 32373 : table_endscan(scan);
542 :
543 32373 : table_close(rel, RowExclusiveLock);
544 32373 : }
545 :
546 : /*
547 : * Does the subscription have any tables?
548 : *
549 : * Use this function only to know true/false, and when you have no need for the
550 : * List returned by GetSubscriptionRelations.
551 : */
552 : bool
553 303 : HasSubscriptionTables(Oid subid)
554 : {
555 : Relation rel;
556 : ScanKeyData skey[1];
557 : SysScanDesc scan;
558 : HeapTuple tup;
559 303 : bool has_subtables = false;
560 :
561 303 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
562 :
563 303 : ScanKeyInit(&skey[0],
564 : Anum_pg_subscription_rel_srsubid,
565 : BTEqualStrategyNumber, F_OIDEQ,
566 : ObjectIdGetDatum(subid));
567 :
568 303 : scan = systable_beginscan(rel, InvalidOid, false,
569 : NULL, 1, skey);
570 :
571 316 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
572 : {
573 : Form_pg_subscription_rel subrel;
574 : char relkind;
575 :
576 309 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
577 309 : relkind = get_rel_relkind(subrel->srrelid);
578 :
579 309 : if (relkind == RELKIND_RELATION ||
580 : relkind == RELKIND_PARTITIONED_TABLE)
581 : {
582 296 : has_subtables = true;
583 296 : break;
584 : }
585 : }
586 :
587 : /* Cleanup */
588 303 : systable_endscan(scan);
589 303 : table_close(rel, AccessShareLock);
590 :
591 303 : return has_subtables;
592 : }
593 :
594 : /*
595 : * Get the relations for the subscription.
596 : *
597 : * If not_ready is true, return only the relations that are not in a ready
598 : * state, otherwise return all the relations of the subscription. The
599 : * returned list is palloc'ed in the current memory context.
600 : */
601 : List *
602 1225 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
603 : bool not_ready)
604 : {
605 1225 : List *res = NIL;
606 : Relation rel;
607 : HeapTuple tup;
608 1225 : int nkeys = 0;
609 : ScanKeyData skey[2];
610 : SysScanDesc scan;
611 :
612 : /* One or both of 'tables' and 'sequences' must be true. */
613 : Assert(tables || sequences);
614 :
615 1225 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
616 :
617 1225 : ScanKeyInit(&skey[nkeys++],
618 : Anum_pg_subscription_rel_srsubid,
619 : BTEqualStrategyNumber, F_OIDEQ,
620 : ObjectIdGetDatum(subid));
621 :
622 1225 : if (not_ready)
623 1186 : ScanKeyInit(&skey[nkeys++],
624 : Anum_pg_subscription_rel_srsubstate,
625 : BTEqualStrategyNumber, F_CHARNE,
626 : CharGetDatum(SUBREL_STATE_READY));
627 :
628 1225 : scan = systable_beginscan(rel, InvalidOid, false,
629 : NULL, nkeys, skey);
630 :
631 3179 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
632 : {
633 : Form_pg_subscription_rel subrel;
634 : SubscriptionRelState *relstate;
635 : Datum d;
636 : bool isnull;
637 : char relkind;
638 :
639 1954 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
640 :
641 : /* Relation is either a sequence or a table */
642 1954 : relkind = get_rel_relkind(subrel->srrelid);
643 : Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
644 : relkind == RELKIND_PARTITIONED_TABLE);
645 :
646 : /* Skip sequences if they were not requested */
647 1954 : if ((relkind == RELKIND_SEQUENCE) && !sequences)
648 0 : continue;
649 :
650 : /* Skip tables if they were not requested */
651 1954 : if ((relkind == RELKIND_RELATION ||
652 1932 : relkind == RELKIND_PARTITIONED_TABLE) && !tables)
653 0 : continue;
654 :
655 1954 : relstate = palloc_object(SubscriptionRelState);
656 1954 : relstate->relid = subrel->srrelid;
657 1954 : relstate->state = subrel->srsubstate;
658 1954 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
659 : Anum_pg_subscription_rel_srsublsn, &isnull);
660 1954 : if (isnull)
661 1606 : relstate->lsn = InvalidXLogRecPtr;
662 : else
663 348 : relstate->lsn = DatumGetLSN(d);
664 :
665 1954 : res = lappend(res, relstate);
666 : }
667 :
668 : /* Cleanup */
669 1225 : systable_endscan(scan);
670 1225 : table_close(rel, AccessShareLock);
671 :
672 1225 : return res;
673 : }
674 :
675 : /*
676 : * Update the dead tuple retention status for the given subscription.
677 : */
678 : void
679 2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
680 : {
681 : Relation rel;
682 : bool nulls[Natts_pg_subscription];
683 : bool replaces[Natts_pg_subscription];
684 : Datum values[Natts_pg_subscription];
685 : HeapTuple tup;
686 :
687 : /* Look up the subscription in the catalog */
688 2 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
689 2 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
690 :
691 2 : if (!HeapTupleIsValid(tup))
692 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
693 :
694 2 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
695 :
696 : /* Form a new tuple. */
697 2 : memset(values, 0, sizeof(values));
698 2 : memset(nulls, false, sizeof(nulls));
699 2 : memset(replaces, false, sizeof(replaces));
700 :
701 : /* Set the subscription to disabled. */
702 2 : values[Anum_pg_subscription_subretentionactive - 1] = active;
703 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
704 :
705 : /* Update the catalog */
706 2 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
707 : replaces);
708 2 : CatalogTupleUpdate(rel, &tup->t_self, tup);
709 2 : heap_freetuple(tup);
710 :
711 2 : table_close(rel, NoLock);
712 2 : }
|