Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/pg_foreign_server.h"
23 : #include "catalog/pg_subscription.h"
24 : #include "catalog/pg_subscription_rel.h"
25 : #include "catalog/pg_type.h"
26 : #include "foreign/foreign.h"
27 : #include "miscadmin.h"
28 : #include "storage/lmgr.h"
29 : #include "storage/lock.h"
30 : #include "utils/acl.h"
31 : #include "utils/array.h"
32 : #include "utils/builtins.h"
33 : #include "utils/fmgroids.h"
34 : #include "utils/lsyscache.h"
35 : #include "utils/memutils.h"
36 : #include "utils/pg_lsn.h"
37 : #include "utils/rel.h"
38 : #include "utils/syscache.h"
39 :
40 : static List *textarray_to_stringlist(ArrayType *textarray);
41 :
42 : /*
43 : * Add a comma-separated list of publication names to the 'dest' string.
44 : *
45 : * If quote_literal is true, the returned list can be used to construct an SQL
46 : * command, thus no translation is applied. Otherwise, the string can be used
47 : * to create a user-facing message, so translatable quote marks are added.
48 : */
49 : void
50 546 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
51 : {
52 : ListCell *lc;
53 546 : bool first = true;
54 :
55 : Assert(publications != NIL);
56 :
57 1400 : foreach(lc, publications)
58 : {
59 854 : char *pubname = strVal(lfirst(lc));
60 :
61 854 : if (quote_literal)
62 : {
63 844 : if (!first)
64 307 : appendStringInfoString(dest, ", ");
65 844 : appendStringInfoString(dest, quote_literal_cstr(pubname));
66 : }
67 : else
68 : {
69 10 : if (first)
70 9 : appendStringInfo(dest, _("\"%s\""), pubname);
71 : else
72 1 : appendStringInfo(dest, _(", \"%s\""), pubname);
73 : }
74 :
75 854 : first = false;
76 : }
77 546 : }
78 :
79 : /*
80 : * Fetch the subscription from the syscache.
81 : *
82 : * If conninfo_needed is true, conninfo will be constructed, possibly
83 : * encountering errors in ForeignServerConnectionString(). Callers not
84 : * expecting such errors should pass false, in which case conninfo will be
85 : * NULL.
86 : */
87 : Subscription *
88 1050 : GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed,
89 : bool conninfo_aclcheck)
90 : {
91 : HeapTuple tup;
92 : Subscription *sub;
93 : Form_pg_subscription subform;
94 : Datum datum;
95 : bool isnull;
96 : MemoryContext cxt;
97 : MemoryContext oldcxt;
98 :
99 : Assert(conninfo_needed || !conninfo_aclcheck);
100 :
101 1050 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
102 :
103 1050 : if (!HeapTupleIsValid(tup))
104 : {
105 71 : if (missing_ok)
106 71 : return NULL;
107 :
108 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
109 : }
110 :
111 979 : cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
112 : ALLOCSET_SMALL_SIZES);
113 979 : oldcxt = MemoryContextSwitchTo(cxt);
114 :
115 979 : subform = (Form_pg_subscription) GETSTRUCT(tup);
116 :
117 979 : sub = palloc0_object(Subscription);
118 979 : sub->cxt = cxt;
119 979 : sub->oid = subid;
120 979 : sub->dbid = subform->subdbid;
121 979 : sub->skiplsn = subform->subskiplsn;
122 979 : sub->name = pstrdup(NameStr(subform->subname));
123 979 : sub->owner = subform->subowner;
124 979 : sub->enabled = subform->subenabled;
125 979 : sub->binary = subform->subbinary;
126 979 : sub->stream = subform->substream;
127 979 : sub->twophasestate = subform->subtwophasestate;
128 979 : sub->disableonerr = subform->subdisableonerr;
129 979 : sub->passwordrequired = subform->subpasswordrequired;
130 979 : sub->runasowner = subform->subrunasowner;
131 979 : sub->failover = subform->subfailover;
132 979 : sub->retaindeadtuples = subform->subretaindeadtuples;
133 979 : sub->maxretention = subform->submaxretention;
134 979 : sub->retentionactive = subform->subretentionactive;
135 :
136 979 : if (conninfo_needed)
137 : {
138 877 : if (OidIsValid(subform->subserver))
139 : {
140 : AclResult aclresult;
141 : ForeignServer *server;
142 :
143 5 : server = GetForeignServer(subform->subserver);
144 :
145 5 : if (conninfo_aclcheck)
146 : {
147 : /* recheck ACL if requested */
148 5 : aclresult = object_aclcheck(ForeignServerRelationId,
149 : subform->subserver,
150 : subform->subowner, ACL_USAGE);
151 :
152 5 : if (aclresult != ACLCHECK_OK)
153 0 : ereport(ERROR,
154 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
155 : errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
156 : GetUserNameFromId(subform->subowner, false),
157 : server->servername)));
158 : }
159 :
160 5 : sub->conninfo = ForeignServerConnectionString(subform->subowner,
161 : server);
162 : }
163 : else
164 : {
165 872 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
166 : tup,
167 : Anum_pg_subscription_subconninfo);
168 872 : sub->conninfo = TextDatumGetCString(datum);
169 : }
170 : }
171 :
172 : /* Get slotname */
173 979 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
174 : tup,
175 : Anum_pg_subscription_subslotname,
176 : &isnull);
177 979 : if (!isnull)
178 935 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
179 : else
180 44 : sub->slotname = NULL;
181 :
182 : /* Get synccommit */
183 979 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
184 : tup,
185 : Anum_pg_subscription_subsynccommit);
186 979 : sub->synccommit = TextDatumGetCString(datum);
187 :
188 : /* Get walrcvtimeout */
189 979 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
190 : tup,
191 : Anum_pg_subscription_subwalrcvtimeout);
192 979 : sub->walrcvtimeout = TextDatumGetCString(datum);
193 :
194 : /* Get publications */
195 979 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
196 : tup,
197 : Anum_pg_subscription_subpublications);
198 979 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
199 :
200 : /* Get origin */
201 979 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
202 : tup,
203 : Anum_pg_subscription_suborigin);
204 979 : sub->origin = TextDatumGetCString(datum);
205 :
206 : /* Is the subscription owner a superuser? */
207 979 : sub->ownersuperuser = superuser_arg(sub->owner);
208 :
209 979 : ReleaseSysCache(tup);
210 :
211 979 : MemoryContextSwitchTo(oldcxt);
212 :
213 979 : return sub;
214 : }
215 :
216 : /*
217 : * Return number of subscriptions defined in given database.
218 : * Used by dropdb() to check if database can indeed be dropped.
219 : */
220 : int
221 51 : CountDBSubscriptions(Oid dbid)
222 : {
223 51 : int nsubs = 0;
224 : Relation rel;
225 : ScanKeyData scankey;
226 : SysScanDesc scan;
227 : HeapTuple tup;
228 :
229 51 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
230 :
231 51 : ScanKeyInit(&scankey,
232 : Anum_pg_subscription_subdbid,
233 : BTEqualStrategyNumber, F_OIDEQ,
234 : ObjectIdGetDatum(dbid));
235 :
236 51 : scan = systable_beginscan(rel, InvalidOid, false,
237 : NULL, 1, &scankey);
238 :
239 51 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
240 0 : nsubs++;
241 :
242 51 : systable_endscan(scan);
243 :
244 51 : table_close(rel, NoLock);
245 :
246 51 : return nsubs;
247 : }
248 :
249 : /*
250 : * Disable the given subscription.
251 : */
252 : void
253 4 : DisableSubscription(Oid subid)
254 : {
255 : Relation rel;
256 : bool nulls[Natts_pg_subscription];
257 : bool replaces[Natts_pg_subscription];
258 : Datum values[Natts_pg_subscription];
259 : HeapTuple tup;
260 :
261 : /* Look up the subscription in the catalog */
262 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
263 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
264 :
265 4 : if (!HeapTupleIsValid(tup))
266 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
267 :
268 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
269 :
270 : /* Form a new tuple. */
271 4 : memset(values, 0, sizeof(values));
272 4 : memset(nulls, false, sizeof(nulls));
273 4 : memset(replaces, false, sizeof(replaces));
274 :
275 : /* Set the subscription to disabled. */
276 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
277 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
278 :
279 : /* Update the catalog */
280 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
281 : replaces);
282 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
283 4 : heap_freetuple(tup);
284 :
285 4 : table_close(rel, NoLock);
286 4 : }
287 :
288 : /*
289 : * Convert text array to list of strings.
290 : *
291 : * Note: the resulting list of strings is pallocated here.
292 : */
293 : static List *
294 979 : textarray_to_stringlist(ArrayType *textarray)
295 : {
296 : Datum *elems;
297 : int nelems,
298 : i;
299 979 : List *res = NIL;
300 :
301 979 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
302 :
303 979 : if (nelems == 0)
304 0 : return NIL;
305 :
306 2360 : for (i = 0; i < nelems; i++)
307 1381 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
308 :
309 979 : return res;
310 : }
311 :
312 : /*
313 : * Add new state record for a subscription table.
314 : *
315 : * If retain_lock is true, then don't release the locks taken in this function.
316 : * We normally release the locks at the end of transaction but in binary-upgrade
317 : * mode, we expect to release those immediately.
318 : */
319 : void
320 228 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
321 : XLogRecPtr sublsn, bool retain_lock)
322 : {
323 : Relation rel;
324 : HeapTuple tup;
325 : bool nulls[Natts_pg_subscription_rel];
326 : Datum values[Natts_pg_subscription_rel];
327 :
328 228 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
329 :
330 228 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
331 :
332 : /* Try finding existing mapping. */
333 228 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
334 : ObjectIdGetDatum(relid),
335 : ObjectIdGetDatum(subid));
336 228 : if (HeapTupleIsValid(tup))
337 0 : elog(ERROR, "subscription relation %u in subscription %u already exists",
338 : relid, subid);
339 :
340 : /* Form the tuple. */
341 228 : memset(values, 0, sizeof(values));
342 228 : memset(nulls, false, sizeof(nulls));
343 228 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
344 228 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
345 228 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
346 228 : if (XLogRecPtrIsValid(sublsn))
347 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
348 : else
349 226 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
350 :
351 228 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
352 :
353 : /* Insert tuple into catalog. */
354 228 : CatalogTupleInsert(rel, tup);
355 :
356 228 : heap_freetuple(tup);
357 :
358 : /* Cleanup. */
359 228 : if (retain_lock)
360 : {
361 225 : table_close(rel, NoLock);
362 : }
363 : else
364 : {
365 3 : table_close(rel, RowExclusiveLock);
366 3 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
367 : }
368 228 : }
369 :
370 : /*
371 : * Update the state of a subscription table.
372 : */
373 : void
374 842 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
375 : XLogRecPtr sublsn, bool already_locked)
376 : {
377 : Relation rel;
378 : HeapTuple tup;
379 : bool nulls[Natts_pg_subscription_rel];
380 : Datum values[Natts_pg_subscription_rel];
381 : bool replaces[Natts_pg_subscription_rel];
382 :
383 842 : if (already_locked)
384 : {
385 : #ifdef USE_ASSERT_CHECKING
386 : LOCKTAG tag;
387 :
388 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
389 : RowExclusiveLock, true));
390 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
391 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
392 : #endif
393 :
394 196 : rel = table_open(SubscriptionRelRelationId, NoLock);
395 : }
396 : else
397 : {
398 646 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
399 646 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
400 : }
401 :
402 : /* Try finding existing mapping. */
403 842 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
404 : ObjectIdGetDatum(relid),
405 : ObjectIdGetDatum(subid));
406 842 : if (!HeapTupleIsValid(tup))
407 0 : elog(ERROR, "subscription relation %u in subscription %u does not exist",
408 : relid, subid);
409 :
410 : /* Update the tuple. */
411 842 : memset(values, 0, sizeof(values));
412 842 : memset(nulls, false, sizeof(nulls));
413 842 : memset(replaces, false, sizeof(replaces));
414 :
415 842 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
416 842 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
417 :
418 842 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
419 842 : if (XLogRecPtrIsValid(sublsn))
420 414 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
421 : else
422 428 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
423 :
424 842 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
425 : replaces);
426 :
427 : /* Update the catalog. */
428 842 : CatalogTupleUpdate(rel, &tup->t_self, tup);
429 :
430 : /* Cleanup. */
431 842 : table_close(rel, NoLock);
432 842 : }
433 :
434 : /*
435 : * Get state of subscription table.
436 : *
437 : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
438 : */
439 : char
440 1309 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
441 : {
442 : HeapTuple tup;
443 : char substate;
444 : bool isnull;
445 : Datum d;
446 : Relation rel;
447 :
448 : /*
449 : * This is to avoid the race condition with AlterSubscription which tries
450 : * to remove this relstate.
451 : */
452 1309 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
453 :
454 : /* Try finding the mapping. */
455 1309 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
456 : ObjectIdGetDatum(relid),
457 : ObjectIdGetDatum(subid));
458 :
459 1309 : if (!HeapTupleIsValid(tup))
460 : {
461 33 : table_close(rel, AccessShareLock);
462 33 : *sublsn = InvalidXLogRecPtr;
463 33 : return SUBREL_STATE_UNKNOWN;
464 : }
465 :
466 : /* Get the state. */
467 1276 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
468 :
469 : /* Get the LSN */
470 1276 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
471 : Anum_pg_subscription_rel_srsublsn, &isnull);
472 1276 : if (isnull)
473 684 : *sublsn = InvalidXLogRecPtr;
474 : else
475 592 : *sublsn = DatumGetLSN(d);
476 :
477 : /* Cleanup */
478 1276 : ReleaseSysCache(tup);
479 :
480 1276 : table_close(rel, AccessShareLock);
481 :
482 1276 : return substate;
483 : }
484 :
485 : /*
486 : * Drop subscription relation mapping. These can be for a particular
487 : * subscription, or for a particular relation, or both.
488 : */
489 : void
490 33948 : RemoveSubscriptionRel(Oid subid, Oid relid)
491 : {
492 : Relation rel;
493 : TableScanDesc scan;
494 : ScanKeyData skey[2];
495 : HeapTuple tup;
496 33948 : int nkeys = 0;
497 :
498 33948 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
499 :
500 33948 : if (OidIsValid(subid))
501 : {
502 172 : ScanKeyInit(&skey[nkeys++],
503 : Anum_pg_subscription_rel_srsubid,
504 : BTEqualStrategyNumber,
505 : F_OIDEQ,
506 : ObjectIdGetDatum(subid));
507 : }
508 :
509 33948 : if (OidIsValid(relid))
510 : {
511 33797 : ScanKeyInit(&skey[nkeys++],
512 : Anum_pg_subscription_rel_srrelid,
513 : BTEqualStrategyNumber,
514 : F_OIDEQ,
515 : ObjectIdGetDatum(relid));
516 : }
517 :
518 : /* Do the search and delete what we found. */
519 33948 : scan = table_beginscan_catalog(rel, nkeys, skey);
520 34078 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
521 : {
522 : Form_pg_subscription_rel subrel;
523 :
524 130 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
525 :
526 : /*
527 : * We don't allow to drop the relation mapping when the table
528 : * synchronization is in progress unless the caller updates the
529 : * corresponding subscription as well. This is to ensure that we don't
530 : * leave tablesync slots or origins in the system when the
531 : * corresponding table is dropped. For sequences, however, it's ok to
532 : * drop them since no separate slots or origins are created during
533 : * synchronization.
534 : */
535 130 : if (!OidIsValid(subid) &&
536 16 : subrel->srsubstate != SUBREL_STATE_READY &&
537 0 : get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
538 : {
539 0 : ereport(ERROR,
540 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
541 : errmsg("could not drop relation mapping for subscription \"%s\"",
542 : get_subscription_name(subrel->srsubid, false)),
543 : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
544 : get_rel_name(relid), subrel->srsubstate),
545 :
546 : /*
547 : * translator: first %s is a SQL ALTER command and second %s is a
548 : * SQL DROP command
549 : */
550 : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
551 : "ALTER SUBSCRIPTION ... ENABLE",
552 : "DROP SUBSCRIPTION ...")));
553 : }
554 :
555 130 : CatalogTupleDelete(rel, &tup->t_self);
556 : }
557 33948 : table_endscan(scan);
558 :
559 33948 : table_close(rel, RowExclusiveLock);
560 33948 : }
561 :
562 : /*
563 : * Does the subscription have any tables?
564 : *
565 : * Use this function only to know true/false, and when you have no need for the
566 : * List returned by GetSubscriptionRelations.
567 : */
568 : bool
569 309 : HasSubscriptionTables(Oid subid)
570 : {
571 : Relation rel;
572 : ScanKeyData skey[1];
573 : SysScanDesc scan;
574 : HeapTuple tup;
575 309 : bool has_subtables = false;
576 :
577 309 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
578 :
579 309 : ScanKeyInit(&skey[0],
580 : Anum_pg_subscription_rel_srsubid,
581 : BTEqualStrategyNumber, F_OIDEQ,
582 : ObjectIdGetDatum(subid));
583 :
584 309 : scan = systable_beginscan(rel, InvalidOid, false,
585 : NULL, 1, skey);
586 :
587 368 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
588 : {
589 : Form_pg_subscription_rel subrel;
590 : char relkind;
591 :
592 349 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
593 349 : relkind = get_rel_relkind(subrel->srrelid);
594 :
595 349 : if (relkind == RELKIND_RELATION ||
596 : relkind == RELKIND_PARTITIONED_TABLE)
597 : {
598 290 : has_subtables = true;
599 290 : break;
600 : }
601 : }
602 :
603 : /* Cleanup */
604 309 : systable_endscan(scan);
605 309 : table_close(rel, AccessShareLock);
606 :
607 309 : return has_subtables;
608 : }
609 :
610 : /*
611 : * Get the relations for the subscription.
612 : *
613 : * If not_ready is true, return only the relations that are not in a ready
614 : * state, otherwise return all the relations of the subscription. The
615 : * returned list is palloc'ed in the current memory context.
616 : */
617 : List *
618 1251 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
619 : bool not_ready)
620 : {
621 1251 : List *res = NIL;
622 : Relation rel;
623 : HeapTuple tup;
624 1251 : int nkeys = 0;
625 : ScanKeyData skey[2];
626 : SysScanDesc scan;
627 :
628 : /* One or both of 'tables' and 'sequences' must be true. */
629 : Assert(tables || sequences);
630 :
631 1251 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
632 :
633 1251 : ScanKeyInit(&skey[nkeys++],
634 : Anum_pg_subscription_rel_srsubid,
635 : BTEqualStrategyNumber, F_OIDEQ,
636 : ObjectIdGetDatum(subid));
637 :
638 1251 : if (not_ready)
639 1210 : ScanKeyInit(&skey[nkeys++],
640 : Anum_pg_subscription_rel_srsubstate,
641 : BTEqualStrategyNumber, F_CHARNE,
642 : CharGetDatum(SUBREL_STATE_READY));
643 :
644 1251 : scan = systable_beginscan(rel, InvalidOid, false,
645 : NULL, nkeys, skey);
646 :
647 3239 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
648 : {
649 : Form_pg_subscription_rel subrel;
650 : SubscriptionRelState *relstate;
651 : Datum d;
652 : bool isnull;
653 : char relkind;
654 :
655 1988 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
656 :
657 : /* Relation is either a sequence or a table */
658 1988 : relkind = get_rel_relkind(subrel->srrelid);
659 : Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
660 : relkind == RELKIND_PARTITIONED_TABLE);
661 :
662 : /* Skip sequences if they were not requested */
663 1988 : if ((relkind == RELKIND_SEQUENCE) && !sequences)
664 0 : continue;
665 :
666 : /* Skip tables if they were not requested */
667 1988 : if ((relkind == RELKIND_RELATION ||
668 1938 : relkind == RELKIND_PARTITIONED_TABLE) && !tables)
669 0 : continue;
670 :
671 1988 : relstate = palloc_object(SubscriptionRelState);
672 1988 : relstate->relid = subrel->srrelid;
673 1988 : relstate->state = subrel->srsubstate;
674 1988 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
675 : Anum_pg_subscription_rel_srsublsn, &isnull);
676 1988 : if (isnull)
677 1621 : relstate->lsn = InvalidXLogRecPtr;
678 : else
679 367 : relstate->lsn = DatumGetLSN(d);
680 :
681 1988 : res = lappend(res, relstate);
682 : }
683 :
684 : /* Cleanup */
685 1251 : systable_endscan(scan);
686 1251 : table_close(rel, AccessShareLock);
687 :
688 1251 : return res;
689 : }
690 :
691 : /*
692 : * Update the dead tuple retention status for the given subscription.
693 : */
694 : void
695 2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
696 : {
697 : Relation rel;
698 : bool nulls[Natts_pg_subscription];
699 : bool replaces[Natts_pg_subscription];
700 : Datum values[Natts_pg_subscription];
701 : HeapTuple tup;
702 :
703 : /* Look up the subscription in the catalog */
704 2 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
705 2 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
706 :
707 2 : if (!HeapTupleIsValid(tup))
708 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
709 :
710 2 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
711 :
712 : /* Form a new tuple. */
713 2 : memset(values, 0, sizeof(values));
714 2 : memset(nulls, false, sizeof(nulls));
715 2 : memset(replaces, false, sizeof(replaces));
716 :
717 : /* Set the subscription to disabled. */
718 2 : values[Anum_pg_subscription_subretentionactive - 1] = BoolGetDatum(active);
719 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
720 :
721 : /* Update the catalog */
722 2 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
723 : replaces);
724 2 : CatalogTupleUpdate(rel, &tup->t_self, tup);
725 2 : heap_freetuple(tup);
726 :
727 2 : table_close(rel, NoLock);
728 2 : }
|