Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/pg_foreign_server.h"
23 : #include "catalog/pg_subscription.h"
24 : #include "catalog/pg_subscription_rel.h"
25 : #include "catalog/pg_type.h"
26 : #include "foreign/foreign.h"
27 : #include "miscadmin.h"
28 : #include "storage/lmgr.h"
29 : #include "storage/lock.h"
30 : #include "utils/acl.h"
31 : #include "utils/array.h"
32 : #include "utils/builtins.h"
33 : #include "utils/fmgroids.h"
34 : #include "utils/lsyscache.h"
35 : #include "utils/memutils.h"
36 : #include "utils/pg_lsn.h"
37 : #include "utils/rel.h"
38 : #include "utils/syscache.h"
39 :
40 : static List *textarray_to_stringlist(ArrayType *textarray);
41 :
42 : /*
43 : * Add a comma-separated list of publication names to the 'dest' string.
44 : *
45 : * If quote_literal is true, the returned list can be used to construct an SQL
46 : * command, thus no translation is applied. Otherwise, the string can be used
47 : * to create a user-facing message, so translatable quote marks are added.
48 : */
49 : void
50 547 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
51 : {
52 : ListCell *lc;
53 547 : bool first = true;
54 :
55 : Assert(publications != NIL);
56 :
57 1402 : foreach(lc, publications)
58 : {
59 855 : char *pubname = strVal(lfirst(lc));
60 :
61 855 : if (quote_literal)
62 : {
63 845 : if (!first)
64 307 : appendStringInfoString(dest, ", ");
65 845 : appendStringInfoString(dest, quote_literal_cstr(pubname));
66 : }
67 : else
68 : {
69 10 : if (first)
70 9 : appendStringInfo(dest, _("\"%s\""), pubname);
71 : else
72 1 : appendStringInfo(dest, _(", \"%s\""), pubname);
73 : }
74 :
75 855 : first = false;
76 : }
77 547 : }
78 :
79 : /*
80 : * Fetch the subscription from the syscache.
81 : */
82 : Subscription *
83 1054 : GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
84 : {
85 : HeapTuple tup;
86 : Subscription *sub;
87 : Form_pg_subscription subform;
88 : Datum datum;
89 : bool isnull;
90 : MemoryContext cxt;
91 : MemoryContext oldcxt;
92 :
93 1054 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
94 :
95 1054 : if (!HeapTupleIsValid(tup))
96 : {
97 70 : if (missing_ok)
98 70 : return NULL;
99 :
100 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
101 : }
102 :
103 984 : cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
104 : ALLOCSET_SMALL_SIZES);
105 984 : oldcxt = MemoryContextSwitchTo(cxt);
106 :
107 984 : subform = (Form_pg_subscription) GETSTRUCT(tup);
108 :
109 984 : sub = palloc_object(Subscription);
110 984 : sub->cxt = cxt;
111 984 : sub->oid = subid;
112 984 : sub->dbid = subform->subdbid;
113 984 : sub->skiplsn = subform->subskiplsn;
114 984 : sub->name = pstrdup(NameStr(subform->subname));
115 984 : sub->owner = subform->subowner;
116 984 : sub->enabled = subform->subenabled;
117 984 : sub->binary = subform->subbinary;
118 984 : sub->stream = subform->substream;
119 984 : sub->twophasestate = subform->subtwophasestate;
120 984 : sub->disableonerr = subform->subdisableonerr;
121 984 : sub->passwordrequired = subform->subpasswordrequired;
122 984 : sub->runasowner = subform->subrunasowner;
123 984 : sub->failover = subform->subfailover;
124 984 : sub->retaindeadtuples = subform->subretaindeadtuples;
125 984 : sub->maxretention = subform->submaxretention;
126 984 : sub->retentionactive = subform->subretentionactive;
127 :
128 : /* Get conninfo */
129 984 : if (OidIsValid(subform->subserver))
130 : {
131 : AclResult aclresult;
132 : ForeignServer *server;
133 :
134 10 : server = GetForeignServer(subform->subserver);
135 :
136 : /* recheck ACL if requested */
137 10 : if (aclcheck)
138 : {
139 5 : aclresult = object_aclcheck(ForeignServerRelationId,
140 : subform->subserver,
141 : subform->subowner, ACL_USAGE);
142 :
143 5 : if (aclresult != ACLCHECK_OK)
144 0 : ereport(ERROR,
145 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
146 : errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
147 : GetUserNameFromId(subform->subowner, false),
148 : server->servername)));
149 : }
150 :
151 10 : sub->conninfo = ForeignServerConnectionString(subform->subowner,
152 : server);
153 : }
154 : else
155 : {
156 974 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
157 : tup,
158 : Anum_pg_subscription_subconninfo);
159 974 : sub->conninfo = TextDatumGetCString(datum);
160 : }
161 :
162 : /* Get slotname */
163 984 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
164 : tup,
165 : Anum_pg_subscription_subslotname,
166 : &isnull);
167 984 : if (!isnull)
168 940 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
169 : else
170 44 : sub->slotname = NULL;
171 :
172 : /* Get synccommit */
173 984 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
174 : tup,
175 : Anum_pg_subscription_subsynccommit);
176 984 : sub->synccommit = TextDatumGetCString(datum);
177 :
178 : /* Get walrcvtimeout */
179 984 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
180 : tup,
181 : Anum_pg_subscription_subwalrcvtimeout);
182 984 : sub->walrcvtimeout = TextDatumGetCString(datum);
183 :
184 : /* Get publications */
185 984 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
186 : tup,
187 : Anum_pg_subscription_subpublications);
188 984 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
189 :
190 : /* Get origin */
191 984 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
192 : tup,
193 : Anum_pg_subscription_suborigin);
194 984 : sub->origin = TextDatumGetCString(datum);
195 :
196 : /* Is the subscription owner a superuser? */
197 984 : sub->ownersuperuser = superuser_arg(sub->owner);
198 :
199 984 : ReleaseSysCache(tup);
200 :
201 984 : MemoryContextSwitchTo(oldcxt);
202 :
203 984 : return sub;
204 : }
205 :
206 : /*
207 : * Return number of subscriptions defined in given database.
208 : * Used by dropdb() to check if database can indeed be dropped.
209 : */
210 : int
211 50 : CountDBSubscriptions(Oid dbid)
212 : {
213 50 : int nsubs = 0;
214 : Relation rel;
215 : ScanKeyData scankey;
216 : SysScanDesc scan;
217 : HeapTuple tup;
218 :
219 50 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
220 :
221 50 : ScanKeyInit(&scankey,
222 : Anum_pg_subscription_subdbid,
223 : BTEqualStrategyNumber, F_OIDEQ,
224 : ObjectIdGetDatum(dbid));
225 :
226 50 : scan = systable_beginscan(rel, InvalidOid, false,
227 : NULL, 1, &scankey);
228 :
229 50 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
230 0 : nsubs++;
231 :
232 50 : systable_endscan(scan);
233 :
234 50 : table_close(rel, NoLock);
235 :
236 50 : return nsubs;
237 : }
238 :
239 : /*
240 : * Disable the given subscription.
241 : */
242 : void
243 4 : DisableSubscription(Oid subid)
244 : {
245 : Relation rel;
246 : bool nulls[Natts_pg_subscription];
247 : bool replaces[Natts_pg_subscription];
248 : Datum values[Natts_pg_subscription];
249 : HeapTuple tup;
250 :
251 : /* Look up the subscription in the catalog */
252 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
253 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
254 :
255 4 : if (!HeapTupleIsValid(tup))
256 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
257 :
258 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
259 :
260 : /* Form a new tuple. */
261 4 : memset(values, 0, sizeof(values));
262 4 : memset(nulls, false, sizeof(nulls));
263 4 : memset(replaces, false, sizeof(replaces));
264 :
265 : /* Set the subscription to disabled. */
266 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
267 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
268 :
269 : /* Update the catalog */
270 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
271 : replaces);
272 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
273 4 : heap_freetuple(tup);
274 :
275 4 : table_close(rel, NoLock);
276 4 : }
277 :
278 : /*
279 : * Convert text array to list of strings.
280 : *
281 : * Note: the resulting list of strings is pallocated here.
282 : */
283 : static List *
284 984 : textarray_to_stringlist(ArrayType *textarray)
285 : {
286 : Datum *elems;
287 : int nelems,
288 : i;
289 984 : List *res = NIL;
290 :
291 984 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
292 :
293 984 : if (nelems == 0)
294 0 : return NIL;
295 :
296 2389 : for (i = 0; i < nelems; i++)
297 1405 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
298 :
299 984 : return res;
300 : }
301 :
302 : /*
303 : * Add new state record for a subscription table.
304 : *
305 : * If retain_lock is true, then don't release the locks taken in this function.
306 : * We normally release the locks at the end of transaction but in binary-upgrade
307 : * mode, we expect to release those immediately.
308 : */
309 : void
310 228 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
311 : XLogRecPtr sublsn, bool retain_lock)
312 : {
313 : Relation rel;
314 : HeapTuple tup;
315 : bool nulls[Natts_pg_subscription_rel];
316 : Datum values[Natts_pg_subscription_rel];
317 :
318 228 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
319 :
320 228 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
321 :
322 : /* Try finding existing mapping. */
323 228 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
324 : ObjectIdGetDatum(relid),
325 : ObjectIdGetDatum(subid));
326 228 : if (HeapTupleIsValid(tup))
327 0 : elog(ERROR, "subscription relation %u in subscription %u already exists",
328 : relid, subid);
329 :
330 : /* Form the tuple. */
331 228 : memset(values, 0, sizeof(values));
332 228 : memset(nulls, false, sizeof(nulls));
333 228 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
334 228 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
335 228 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
336 228 : if (XLogRecPtrIsValid(sublsn))
337 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
338 : else
339 226 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
340 :
341 228 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
342 :
343 : /* Insert tuple into catalog. */
344 228 : CatalogTupleInsert(rel, tup);
345 :
346 228 : heap_freetuple(tup);
347 :
348 : /* Cleanup. */
349 228 : if (retain_lock)
350 : {
351 225 : table_close(rel, NoLock);
352 : }
353 : else
354 : {
355 3 : table_close(rel, RowExclusiveLock);
356 3 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
357 : }
358 228 : }
359 :
360 : /*
361 : * Update the state of a subscription table.
362 : */
363 : void
364 830 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
365 : XLogRecPtr sublsn, bool already_locked)
366 : {
367 : Relation rel;
368 : HeapTuple tup;
369 : bool nulls[Natts_pg_subscription_rel];
370 : Datum values[Natts_pg_subscription_rel];
371 : bool replaces[Natts_pg_subscription_rel];
372 :
373 830 : if (already_locked)
374 : {
375 : #ifdef USE_ASSERT_CHECKING
376 : LOCKTAG tag;
377 :
378 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
379 : RowExclusiveLock, true));
380 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
381 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
382 : #endif
383 :
384 196 : rel = table_open(SubscriptionRelRelationId, NoLock);
385 : }
386 : else
387 : {
388 634 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
389 633 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
390 : }
391 :
392 : /* Try finding existing mapping. */
393 829 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
394 : ObjectIdGetDatum(relid),
395 : ObjectIdGetDatum(subid));
396 829 : if (!HeapTupleIsValid(tup))
397 0 : elog(ERROR, "subscription relation %u in subscription %u does not exist",
398 : relid, subid);
399 :
400 : /* Update the tuple. */
401 829 : memset(values, 0, sizeof(values));
402 829 : memset(nulls, false, sizeof(nulls));
403 829 : memset(replaces, false, sizeof(replaces));
404 :
405 829 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
406 829 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
407 :
408 829 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
409 829 : if (XLogRecPtrIsValid(sublsn))
410 408 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
411 : else
412 421 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
413 :
414 829 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
415 : replaces);
416 :
417 : /* Update the catalog. */
418 829 : CatalogTupleUpdate(rel, &tup->t_self, tup);
419 :
420 : /* Cleanup. */
421 829 : table_close(rel, NoLock);
422 829 : }
423 :
424 : /*
425 : * Get state of subscription table.
426 : *
427 : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
428 : */
429 : char
430 4291 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
431 : {
432 : HeapTuple tup;
433 : char substate;
434 : bool isnull;
435 : Datum d;
436 : Relation rel;
437 :
438 : /*
439 : * This is to avoid the race condition with AlterSubscription which tries
440 : * to remove this relstate.
441 : */
442 4291 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
443 :
444 : /* Try finding the mapping. */
445 4291 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
446 : ObjectIdGetDatum(relid),
447 : ObjectIdGetDatum(subid));
448 :
449 4291 : if (!HeapTupleIsValid(tup))
450 : {
451 33 : table_close(rel, AccessShareLock);
452 33 : *sublsn = InvalidXLogRecPtr;
453 33 : return SUBREL_STATE_UNKNOWN;
454 : }
455 :
456 : /* Get the state. */
457 4258 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
458 :
459 : /* Get the LSN */
460 4258 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
461 : Anum_pg_subscription_rel_srsublsn, &isnull);
462 4258 : if (isnull)
463 3673 : *sublsn = InvalidXLogRecPtr;
464 : else
465 585 : *sublsn = DatumGetLSN(d);
466 :
467 : /* Cleanup */
468 4258 : ReleaseSysCache(tup);
469 :
470 4258 : table_close(rel, AccessShareLock);
471 :
472 4258 : return substate;
473 : }
474 :
475 : /*
476 : * Drop subscription relation mapping. These can be for a particular
477 : * subscription, or for a particular relation, or both.
478 : */
479 : void
480 33821 : RemoveSubscriptionRel(Oid subid, Oid relid)
481 : {
482 : Relation rel;
483 : TableScanDesc scan;
484 : ScanKeyData skey[2];
485 : HeapTuple tup;
486 33821 : int nkeys = 0;
487 :
488 33821 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
489 :
490 33821 : if (OidIsValid(subid))
491 : {
492 164 : ScanKeyInit(&skey[nkeys++],
493 : Anum_pg_subscription_rel_srsubid,
494 : BTEqualStrategyNumber,
495 : F_OIDEQ,
496 : ObjectIdGetDatum(subid));
497 : }
498 :
499 33821 : if (OidIsValid(relid))
500 : {
501 33678 : ScanKeyInit(&skey[nkeys++],
502 : Anum_pg_subscription_rel_srrelid,
503 : BTEqualStrategyNumber,
504 : F_OIDEQ,
505 : ObjectIdGetDatum(relid));
506 : }
507 :
508 : /* Do the search and delete what we found. */
509 33821 : scan = table_beginscan_catalog(rel, nkeys, skey);
510 33951 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
511 : {
512 : Form_pg_subscription_rel subrel;
513 :
514 130 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
515 :
516 : /*
517 : * We don't allow to drop the relation mapping when the table
518 : * synchronization is in progress unless the caller updates the
519 : * corresponding subscription as well. This is to ensure that we don't
520 : * leave tablesync slots or origins in the system when the
521 : * corresponding table is dropped. For sequences, however, it's ok to
522 : * drop them since no separate slots or origins are created during
523 : * synchronization.
524 : */
525 130 : if (!OidIsValid(subid) &&
526 16 : subrel->srsubstate != SUBREL_STATE_READY &&
527 0 : get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
528 : {
529 0 : ereport(ERROR,
530 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
531 : errmsg("could not drop relation mapping for subscription \"%s\"",
532 : get_subscription_name(subrel->srsubid, false)),
533 : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
534 : get_rel_name(relid), subrel->srsubstate),
535 :
536 : /*
537 : * translator: first %s is a SQL ALTER command and second %s is a
538 : * SQL DROP command
539 : */
540 : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
541 : "ALTER SUBSCRIPTION ... ENABLE",
542 : "DROP SUBSCRIPTION ...")));
543 : }
544 :
545 130 : CatalogTupleDelete(rel, &tup->t_self);
546 : }
547 33821 : table_endscan(scan);
548 :
549 33821 : table_close(rel, RowExclusiveLock);
550 33821 : }
551 :
552 : /*
553 : * Does the subscription have any tables?
554 : *
555 : * Use this function only to know true/false, and when you have no need for the
556 : * List returned by GetSubscriptionRelations.
557 : */
558 : bool
559 301 : HasSubscriptionTables(Oid subid)
560 : {
561 : Relation rel;
562 : ScanKeyData skey[1];
563 : SysScanDesc scan;
564 : HeapTuple tup;
565 301 : bool has_subtables = false;
566 :
567 301 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
568 :
569 301 : ScanKeyInit(&skey[0],
570 : Anum_pg_subscription_rel_srsubid,
571 : BTEqualStrategyNumber, F_OIDEQ,
572 : ObjectIdGetDatum(subid));
573 :
574 301 : scan = systable_beginscan(rel, InvalidOid, false,
575 : NULL, 1, skey);
576 :
577 351 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
578 : {
579 : Form_pg_subscription_rel subrel;
580 : char relkind;
581 :
582 333 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
583 333 : relkind = get_rel_relkind(subrel->srrelid);
584 :
585 333 : if (relkind == RELKIND_RELATION ||
586 : relkind == RELKIND_PARTITIONED_TABLE)
587 : {
588 283 : has_subtables = true;
589 283 : break;
590 : }
591 : }
592 :
593 : /* Cleanup */
594 301 : systable_endscan(scan);
595 301 : table_close(rel, AccessShareLock);
596 :
597 301 : return has_subtables;
598 : }
599 :
600 : /*
601 : * Get the relations for the subscription.
602 : *
603 : * If not_ready is true, return only the relations that are not in a ready
604 : * state, otherwise return all the relations of the subscription. The
605 : * returned list is palloc'ed in the current memory context.
606 : */
607 : List *
608 1226 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
609 : bool not_ready)
610 : {
611 1226 : List *res = NIL;
612 : Relation rel;
613 : HeapTuple tup;
614 1226 : int nkeys = 0;
615 : ScanKeyData skey[2];
616 : SysScanDesc scan;
617 :
618 : /* One or both of 'tables' and 'sequences' must be true. */
619 : Assert(tables || sequences);
620 :
621 1226 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
622 :
623 1226 : ScanKeyInit(&skey[nkeys++],
624 : Anum_pg_subscription_rel_srsubid,
625 : BTEqualStrategyNumber, F_OIDEQ,
626 : ObjectIdGetDatum(subid));
627 :
628 1226 : if (not_ready)
629 1186 : ScanKeyInit(&skey[nkeys++],
630 : Anum_pg_subscription_rel_srsubstate,
631 : BTEqualStrategyNumber, F_CHARNE,
632 : CharGetDatum(SUBREL_STATE_READY));
633 :
634 1226 : scan = systable_beginscan(rel, InvalidOid, false,
635 : NULL, nkeys, skey);
636 :
637 3177 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
638 : {
639 : Form_pg_subscription_rel subrel;
640 : SubscriptionRelState *relstate;
641 : Datum d;
642 : bool isnull;
643 : char relkind;
644 :
645 1951 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
646 :
647 : /* Relation is either a sequence or a table */
648 1951 : relkind = get_rel_relkind(subrel->srrelid);
649 : Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
650 : relkind == RELKIND_PARTITIONED_TABLE);
651 :
652 : /* Skip sequences if they were not requested */
653 1951 : if ((relkind == RELKIND_SEQUENCE) && !sequences)
654 0 : continue;
655 :
656 : /* Skip tables if they were not requested */
657 1951 : if ((relkind == RELKIND_RELATION ||
658 1911 : relkind == RELKIND_PARTITIONED_TABLE) && !tables)
659 0 : continue;
660 :
661 1951 : relstate = palloc_object(SubscriptionRelState);
662 1951 : relstate->relid = subrel->srrelid;
663 1951 : relstate->state = subrel->srsubstate;
664 1951 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
665 : Anum_pg_subscription_rel_srsublsn, &isnull);
666 1951 : if (isnull)
667 1594 : relstate->lsn = InvalidXLogRecPtr;
668 : else
669 357 : relstate->lsn = DatumGetLSN(d);
670 :
671 1951 : res = lappend(res, relstate);
672 : }
673 :
674 : /* Cleanup */
675 1226 : systable_endscan(scan);
676 1226 : table_close(rel, AccessShareLock);
677 :
678 1226 : return res;
679 : }
680 :
681 : /*
682 : * Update the dead tuple retention status for the given subscription.
683 : */
684 : void
685 2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
686 : {
687 : Relation rel;
688 : bool nulls[Natts_pg_subscription];
689 : bool replaces[Natts_pg_subscription];
690 : Datum values[Natts_pg_subscription];
691 : HeapTuple tup;
692 :
693 : /* Look up the subscription in the catalog */
694 2 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
695 2 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
696 :
697 2 : if (!HeapTupleIsValid(tup))
698 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
699 :
700 2 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
701 :
702 : /* Form a new tuple. */
703 2 : memset(values, 0, sizeof(values));
704 2 : memset(nulls, false, sizeof(nulls));
705 2 : memset(replaces, false, sizeof(replaces));
706 :
707 : /* Set the subscription to disabled. */
708 2 : values[Anum_pg_subscription_subretentionactive - 1] = BoolGetDatum(active);
709 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
710 :
711 : /* Update the catalog */
712 2 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
713 : replaces);
714 2 : CatalogTupleUpdate(rel, &tup->t_self, tup);
715 2 : heap_freetuple(tup);
716 :
717 2 : table_close(rel, NoLock);
718 2 : }
|