Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_publication.c
4 : * publication C API manipulation
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_publication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/catalog.h"
22 : #include "catalog/dependency.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/partition.h"
27 : #include "catalog/pg_inherits.h"
28 : #include "catalog/pg_namespace.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/publicationcmds.h"
34 : #include "funcapi.h"
35 : #include "utils/array.h"
36 : #include "utils/builtins.h"
37 : #include "utils/catcache.h"
38 : #include "utils/fmgroids.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/syscache.h"
42 :
43 : /* Records association between publication and published table */
44 : typedef struct
45 : {
46 : Oid relid; /* OID of published table */
47 : Oid pubid; /* OID of publication that publishes this
48 : * table. */
49 : } published_rel;
50 :
51 : /*
52 : * Check if relation can be in given publication and throws appropriate
53 : * error if not.
54 : */
55 : static void
56 808 : check_publication_add_relation(PublicationRelInfo *pri)
57 : {
58 808 : Relation targetrel = pri->relation;
59 : const char *errormsg;
60 :
61 808 : if (pri->except)
62 81 : errormsg = gettext_noop("cannot specify relation \"%s\" in the publication EXCEPT clause");
63 : else
64 727 : errormsg = gettext_noop("cannot add relation \"%s\" to publication");
65 :
66 : /* If in EXCEPT clause, must be root partitioned table */
67 808 : if (pri->except && targetrel->rd_rel->relispartition)
68 4 : ereport(ERROR,
69 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
70 : errmsg(errormsg, RelationGetRelationName(targetrel)),
71 : errdetail("This operation is not supported for individual partitions.")));
72 :
73 : /* Must be a regular or partitioned table */
74 804 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
75 115 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
76 9 : ereport(ERROR,
77 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
78 : errmsg(errormsg, RelationGetRelationName(targetrel)),
79 : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
80 :
81 : /* Can't be system table */
82 795 : if (IsCatalogRelation(targetrel))
83 4 : ereport(ERROR,
84 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : errmsg(errormsg, RelationGetRelationName(targetrel)),
86 : errdetail("This operation is not supported for system tables.")));
87 :
88 : /* UNLOGGED and TEMP relations cannot be part of publication. */
89 791 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
90 4 : ereport(ERROR,
91 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
92 : errmsg(errormsg, RelationGetRelationName(targetrel)),
93 : errdetail("This operation is not supported for temporary tables.")));
94 787 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
95 4 : ereport(ERROR,
96 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
97 : errmsg(errormsg, RelationGetRelationName(targetrel)),
98 : errdetail("This operation is not supported for unlogged tables.")));
99 783 : }
100 :
101 : /*
102 : * Check if schema can be in given publication and throw appropriate error if
103 : * not.
104 : */
105 : static void
106 163 : check_publication_add_schema(Oid schemaid)
107 : {
108 : /* Can't be system namespace */
109 163 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
110 4 : ereport(ERROR,
111 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
112 : errmsg("cannot add schema \"%s\" to publication",
113 : get_namespace_name(schemaid)),
114 : errdetail("This operation is not supported for system schemas.")));
115 :
116 : /* Can't be temporary namespace */
117 159 : if (isAnyTempNamespace(schemaid))
118 0 : ereport(ERROR,
119 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
120 : errmsg("cannot add schema \"%s\" to publication",
121 : get_namespace_name(schemaid)),
122 : errdetail("Temporary schemas cannot be replicated.")));
123 159 : }
124 :
125 : /*
126 : * Returns if relation represented by oid and Form_pg_class entry
127 : * is publishable.
128 : *
129 : * Does same checks as check_publication_add_relation() above except for
130 : * RELKIND_SEQUENCE, but does not need relation to be opened and also does
131 : * not throw errors. Here, the additional check is to support ALL SEQUENCES
132 : * publication.
133 : *
134 : * XXX This also excludes all tables with relid < FirstNormalObjectId,
135 : * ie all tables created during initdb. This mainly affects the preinstalled
136 : * information_schema. IsCatalogRelationOid() only excludes tables with
137 : * relid < FirstUnpinnedObjectId, making that test rather redundant,
138 : * but really we should get rid of the FirstNormalObjectId test not
139 : * IsCatalogRelationOid. We can't do so today because we don't want
140 : * information_schema tables to be considered publishable; but this test
141 : * is really inadequate for that, since the information_schema could be
142 : * dropped and reloaded and then it'll be considered publishable. The best
143 : * long-term solution may be to add a "relispublishable" bool to pg_class,
144 : * and depend on that instead of OID checks.
145 : */
146 : static bool
147 322939 : is_publishable_class(Oid relid, Form_pg_class reltuple)
148 : {
149 331211 : return (reltuple->relkind == RELKIND_RELATION ||
150 8272 : reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
151 7264 : reltuple->relkind == RELKIND_SEQUENCE) &&
152 317046 : !IsCatalogRelationOid(relid) &&
153 645878 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
154 : relid >= FirstNormalObjectId;
155 : }
156 :
157 : /*
158 : * Another variant of is_publishable_class(), taking a Relation.
159 : */
160 : bool
161 297427 : is_publishable_relation(Relation rel)
162 : {
163 297427 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
164 : }
165 :
166 : /*
167 : * Similar to is_publishable_class() but checks whether the given OID
168 : * is a publishable "table" or not.
169 : */
170 : static bool
171 551 : is_publishable_table(Oid tableoid)
172 : {
173 : HeapTuple tuple;
174 : Form_pg_class relform;
175 :
176 551 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(tableoid));
177 551 : if (!HeapTupleIsValid(tuple))
178 4 : return false;
179 :
180 547 : relform = (Form_pg_class) GETSTRUCT(tuple);
181 :
182 : /*
183 : * is_publishable_class() includes sequences, so we need to explicitly
184 : * check the relkind to filter them out here.
185 : */
186 1094 : if (relform->relkind != RELKIND_SEQUENCE &&
187 547 : is_publishable_class(tableoid, relform))
188 : {
189 543 : ReleaseSysCache(tuple);
190 543 : return true;
191 : }
192 :
193 4 : ReleaseSysCache(tuple);
194 4 : return false;
195 : }
196 :
197 : /*
198 : * SQL-callable variant of the above
199 : *
200 : * This returns null when the relation does not exist. This is intended to be
201 : * used for example in psql to avoid gratuitous errors when there are
202 : * concurrent catalog changes.
203 : */
204 : Datum
205 4288 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
206 : {
207 4288 : Oid relid = PG_GETARG_OID(0);
208 : HeapTuple tuple;
209 : bool result;
210 :
211 4288 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
212 4288 : if (!HeapTupleIsValid(tuple))
213 0 : PG_RETURN_NULL();
214 4288 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
215 4288 : ReleaseSysCache(tuple);
216 4288 : PG_RETURN_BOOL(result);
217 : }
218 :
219 : /*
220 : * Returns true if the ancestor is in the list of published relations.
221 : * Otherwise, returns false.
222 : */
223 : static bool
224 61 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
225 : {
226 : ListCell *lc;
227 :
228 277 : foreach(lc, table_infos)
229 : {
230 246 : Oid relid = ((published_rel *) lfirst(lc))->relid;
231 :
232 246 : if (relid == ancestor)
233 30 : return true;
234 : }
235 :
236 31 : return false;
237 : }
238 :
239 : /*
240 : * Filter out the partitions whose parent tables are also present in the list.
241 : */
242 : static void
243 169 : filter_partitions(List *table_infos)
244 : {
245 : ListCell *lc;
246 :
247 399 : foreach(lc, table_infos)
248 : {
249 230 : bool skip = false;
250 230 : List *ancestors = NIL;
251 : ListCell *lc2;
252 230 : published_rel *table_info = (published_rel *) lfirst(lc);
253 :
254 230 : if (get_rel_relispartition(table_info->relid))
255 61 : ancestors = get_partition_ancestors(table_info->relid);
256 :
257 261 : foreach(lc2, ancestors)
258 : {
259 61 : Oid ancestor = lfirst_oid(lc2);
260 :
261 61 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
262 : {
263 30 : skip = true;
264 30 : break;
265 : }
266 : }
267 :
268 230 : if (skip)
269 30 : table_infos = foreach_delete_current(table_infos, lc);
270 : }
271 169 : }
272 :
273 : /*
274 : * Returns true if any schema is associated with the publication, false if no
275 : * schema is associated with the publication.
276 : */
277 : bool
278 215 : is_schema_publication(Oid pubid)
279 : {
280 : Relation pubschsrel;
281 : ScanKeyData scankey;
282 : SysScanDesc scan;
283 : HeapTuple tup;
284 215 : bool result = false;
285 :
286 215 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
287 215 : ScanKeyInit(&scankey,
288 : Anum_pg_publication_namespace_pnpubid,
289 : BTEqualStrategyNumber, F_OIDEQ,
290 : ObjectIdGetDatum(pubid));
291 :
292 215 : scan = systable_beginscan(pubschsrel,
293 : PublicationNamespacePnnspidPnpubidIndexId,
294 : true, NULL, 1, &scankey);
295 215 : tup = systable_getnext(scan);
296 215 : result = HeapTupleIsValid(tup);
297 :
298 215 : systable_endscan(scan);
299 215 : table_close(pubschsrel, AccessShareLock);
300 :
301 215 : return result;
302 : }
303 :
304 : /*
305 : * Returns true if the publication has explicitly included relation (i.e.,
306 : * not marked as EXCEPT).
307 : */
308 : bool
309 49 : is_table_publication(Oid pubid)
310 : {
311 : Relation pubrelsrel;
312 : ScanKeyData scankey;
313 : SysScanDesc scan;
314 : HeapTuple tup;
315 49 : bool result = false;
316 :
317 49 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
318 49 : ScanKeyInit(&scankey,
319 : Anum_pg_publication_rel_prpubid,
320 : BTEqualStrategyNumber, F_OIDEQ,
321 : ObjectIdGetDatum(pubid));
322 :
323 49 : scan = systable_beginscan(pubrelsrel,
324 : PublicationRelPrpubidIndexId,
325 : true, NULL, 1, &scankey);
326 49 : tup = systable_getnext(scan);
327 49 : if (HeapTupleIsValid(tup))
328 : {
329 : Form_pg_publication_rel pubrel;
330 :
331 25 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
332 :
333 : /*
334 : * For any publication, pg_publication_rel contains either only EXCEPT
335 : * entries or only explicitly included tables. Therefore, examining
336 : * the first tuple is sufficient to determine table inclusion.
337 : */
338 25 : result = !pubrel->prexcept;
339 : }
340 :
341 49 : systable_endscan(scan);
342 49 : table_close(pubrelsrel, AccessShareLock);
343 :
344 49 : return result;
345 : }
346 :
347 : /*
348 : * Returns true if the relation has column list associated with the
349 : * publication, false otherwise.
350 : *
351 : * If a column list is found, the corresponding bitmap is returned through the
352 : * cols parameter, if provided. The bitmap is constructed within the given
353 : * memory context (mcxt).
354 : */
355 : bool
356 926 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
357 : Bitmapset **cols)
358 : {
359 : HeapTuple cftuple;
360 926 : bool found = false;
361 :
362 926 : if (pub->alltables)
363 222 : return false;
364 :
365 704 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
366 : ObjectIdGetDatum(relid),
367 : ObjectIdGetDatum(pub->oid));
368 704 : if (HeapTupleIsValid(cftuple))
369 : {
370 : Datum cfdatum;
371 : bool isnull;
372 :
373 : /* Lookup the column list attribute. */
374 646 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
375 : Anum_pg_publication_rel_prattrs, &isnull);
376 :
377 : /* Was a column list found? */
378 646 : if (!isnull)
379 : {
380 : /* Build the column list bitmap in the given memory context. */
381 191 : if (cols)
382 188 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
383 :
384 191 : found = true;
385 : }
386 :
387 646 : ReleaseSysCache(cftuple);
388 : }
389 :
390 704 : return found;
391 : }
392 :
393 : /*
394 : * Gets the relations based on the publication partition option for a specified
395 : * relation.
396 : */
397 : List *
398 2499 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
399 : Oid relid)
400 : {
401 2499 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
402 : pub_partopt != PUBLICATION_PART_ROOT)
403 701 : {
404 701 : List *all_parts = find_all_inheritors(relid, NoLock,
405 : NULL);
406 :
407 701 : if (pub_partopt == PUBLICATION_PART_ALL)
408 682 : result = list_concat(result, all_parts);
409 19 : else if (pub_partopt == PUBLICATION_PART_LEAF)
410 : {
411 : ListCell *lc;
412 :
413 69 : foreach(lc, all_parts)
414 : {
415 50 : Oid partOid = lfirst_oid(lc);
416 :
417 50 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
418 30 : result = lappend_oid(result, partOid);
419 : }
420 : }
421 : else
422 : Assert(false);
423 : }
424 : else
425 1798 : result = lappend_oid(result, relid);
426 :
427 2499 : return result;
428 : }
429 :
430 : /*
431 : * Returns the relid of the topmost ancestor that is published via this
432 : * publication if any and set its ancestor level to ancestor_level,
433 : * otherwise returns InvalidOid.
434 : *
435 : * The ancestor_level value allows us to compare the results for multiple
436 : * publications, and decide which value is higher up.
437 : *
438 : * Note that the list of ancestors should be ordered such that the topmost
439 : * ancestor is at the end of the list.
440 : */
441 : Oid
442 427 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
443 : {
444 : ListCell *lc;
445 427 : Oid topmost_relid = InvalidOid;
446 427 : int level = 0;
447 :
448 : /*
449 : * Find the "topmost" ancestor that is in this publication.
450 : */
451 865 : foreach(lc, ancestors)
452 : {
453 438 : Oid ancestor = lfirst_oid(lc);
454 438 : List *apubids = GetRelationIncludedPublications(ancestor);
455 438 : List *aschemaPubids = NIL;
456 :
457 438 : level++;
458 :
459 438 : if (list_member_oid(apubids, puboid))
460 : {
461 223 : topmost_relid = ancestor;
462 :
463 223 : if (ancestor_level)
464 43 : *ancestor_level = level;
465 : }
466 : else
467 : {
468 215 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
469 215 : if (list_member_oid(aschemaPubids, puboid))
470 : {
471 13 : topmost_relid = ancestor;
472 :
473 13 : if (ancestor_level)
474 5 : *ancestor_level = level;
475 : }
476 : }
477 :
478 438 : list_free(apubids);
479 438 : list_free(aschemaPubids);
480 : }
481 :
482 427 : return topmost_relid;
483 : }
484 :
485 : /*
486 : * attnumstoint2vector
487 : * Convert a Bitmapset of AttrNumbers into an int2vector.
488 : *
489 : * AttrNumber numbers are 0-based, i.e., not offset by
490 : * FirstLowInvalidHeapAttributeNumber.
491 : */
492 : static int2vector *
493 212 : attnumstoint2vector(Bitmapset *attrs)
494 : {
495 : int2vector *result;
496 212 : int n = bms_num_members(attrs);
497 212 : int i = -1;
498 212 : int j = 0;
499 :
500 212 : result = buildint2vector(NULL, n);
501 :
502 577 : while ((i = bms_next_member(attrs, i)) >= 0)
503 : {
504 : Assert(i <= PG_INT16_MAX);
505 :
506 365 : result->values[j++] = (int16) i;
507 : }
508 :
509 212 : return result;
510 : }
511 :
512 : /*
513 : * Insert new publication / relation mapping.
514 : */
515 : ObjectAddress
516 830 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
517 : bool if_not_exists, AlterPublicationStmt *alter_stmt)
518 : {
519 : Relation rel;
520 : HeapTuple tup;
521 : Datum values[Natts_pg_publication_rel];
522 : bool nulls[Natts_pg_publication_rel];
523 830 : Relation targetrel = pri->relation;
524 830 : Oid relid = RelationGetRelid(targetrel);
525 : Oid pubreloid;
526 : Bitmapset *attnums;
527 830 : Publication *pub = GetPublication(pubid);
528 : ObjectAddress myself,
529 : referenced;
530 830 : List *relids = NIL;
531 : int i;
532 : bool inval_except_table;
533 :
534 830 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
535 :
536 : /*
537 : * Check for duplicates. Note that this does not really prevent
538 : * duplicates, it's here just to provide nicer error message in common
539 : * case. The real protection is the unique key on the catalog.
540 : */
541 830 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
542 : ObjectIdGetDatum(pubid)))
543 : {
544 22 : table_close(rel, RowExclusiveLock);
545 :
546 22 : if (if_not_exists)
547 18 : return InvalidObjectAddress;
548 :
549 4 : ereport(ERROR,
550 : (errcode(ERRCODE_DUPLICATE_OBJECT),
551 : errmsg("relation \"%s\" is already member of publication \"%s\"",
552 : RelationGetRelationName(targetrel), pub->name)));
553 : }
554 :
555 808 : check_publication_add_relation(pri);
556 :
557 : /* Validate and translate column names into a Bitmapset of attnums. */
558 783 : attnums = pub_collist_validate(pri->relation, pri->columns);
559 :
560 : /* Form a tuple. */
561 767 : memset(values, 0, sizeof(values));
562 767 : memset(nulls, false, sizeof(nulls));
563 :
564 767 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
565 : Anum_pg_publication_rel_oid);
566 767 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
567 767 : values[Anum_pg_publication_rel_prpubid - 1] =
568 767 : ObjectIdGetDatum(pubid);
569 767 : values[Anum_pg_publication_rel_prrelid - 1] =
570 767 : ObjectIdGetDatum(relid);
571 767 : values[Anum_pg_publication_rel_prexcept - 1] =
572 767 : BoolGetDatum(pri->except);
573 :
574 : /* Add qualifications, if available */
575 767 : if (pri->whereClause != NULL)
576 219 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
577 : else
578 548 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
579 :
580 : /* Add column list, if available */
581 767 : if (pri->columns)
582 212 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
583 : else
584 555 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
585 :
586 767 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
587 :
588 : /* Insert tuple into catalog. */
589 767 : CatalogTupleInsert(rel, tup);
590 767 : heap_freetuple(tup);
591 :
592 : /* Register dependencies as needed */
593 767 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
594 :
595 : /* Add dependency on the publication */
596 767 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
597 767 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
598 :
599 : /* Add dependency on the relation */
600 767 : ObjectAddressSet(referenced, RelationRelationId, relid);
601 767 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
602 :
603 : /* Add dependency on the objects mentioned in the qualifications */
604 767 : if (pri->whereClause)
605 219 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
606 : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
607 : false);
608 :
609 : /* Add dependency on the columns, if any are listed */
610 767 : i = -1;
611 1132 : while ((i = bms_next_member(attnums, i)) >= 0)
612 : {
613 365 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
614 365 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
615 : }
616 :
617 : /* Close the table. */
618 767 : table_close(rel, RowExclusiveLock);
619 :
620 : /*
621 : * Determine whether EXCEPT tables require explicit relcache invalidation.
622 : *
623 : * For CREATE PUBLICATION with EXCEPT tables, invalidation is skipped
624 : * here, as CreatePublication() function invalidates all relations as part
625 : * of defining a FOR ALL TABLES publication.
626 : *
627 : * For ALTER PUBLICATION, invalidation is needed only when adding an
628 : * EXCEPT table to a publication already marked as ALL TABLES. For
629 : * publications that were originally empty or defined as ALL SEQUENCES and
630 : * are being converted to ALL TABLES, invalidation is skipped here, as
631 : * AlterPublicationAllFlags() function invalidates all relations while
632 : * marking the publication as ALL TABLES publication.
633 : */
634 776 : inval_except_table = (alter_stmt != NULL) && pub->alltables &&
635 9 : (alter_stmt->for_all_tables && pri->except);
636 :
637 767 : if (!pri->except || inval_except_table)
638 : {
639 : /*
640 : * Invalidate relcache so that publication info is rebuilt.
641 : *
642 : * For the partitioned tables, we must invalidate all partitions
643 : * contained in the respective partition hierarchies, not just the one
644 : * explicitly mentioned in the publication. This is required because
645 : * we implicitly publish the child tables when the parent table is
646 : * published.
647 : */
648 699 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
649 : relid);
650 :
651 699 : InvalidatePublicationRels(relids);
652 : }
653 :
654 767 : return myself;
655 : }
656 :
657 : /*
658 : * pub_collist_validate
659 : * Process and validate the 'columns' list and ensure the columns are all
660 : * valid to use for a publication. Checks for and raises an ERROR for
661 : * any unknown columns, system columns, duplicate columns, or virtual
662 : * generated columns.
663 : *
664 : * Looks up each column's attnum and returns a 0-based Bitmapset of the
665 : * corresponding attnums.
666 : */
667 : Bitmapset *
668 1073 : pub_collist_validate(Relation targetrel, List *columns)
669 : {
670 1073 : Bitmapset *set = NULL;
671 : ListCell *lc;
672 1073 : TupleDesc tupdesc = RelationGetDescr(targetrel);
673 :
674 1618 : foreach(lc, columns)
675 : {
676 569 : char *colname = strVal(lfirst(lc));
677 569 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
678 :
679 569 : if (attnum == InvalidAttrNumber)
680 4 : ereport(ERROR,
681 : errcode(ERRCODE_UNDEFINED_COLUMN),
682 : errmsg("column \"%s\" of relation \"%s\" does not exist",
683 : colname, RelationGetRelationName(targetrel)));
684 :
685 565 : if (!AttrNumberIsForUserDefinedAttr(attnum))
686 8 : ereport(ERROR,
687 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
688 : errmsg("cannot use system column \"%s\" in publication column list",
689 : colname));
690 :
691 557 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
692 4 : ereport(ERROR,
693 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
694 : errmsg("cannot use virtual generated column \"%s\" in publication column list",
695 : colname));
696 :
697 553 : if (bms_is_member(attnum, set))
698 8 : ereport(ERROR,
699 : errcode(ERRCODE_DUPLICATE_OBJECT),
700 : errmsg("duplicate column \"%s\" in publication column list",
701 : colname));
702 :
703 545 : set = bms_add_member(set, attnum);
704 : }
705 :
706 1049 : return set;
707 : }
708 :
709 : /*
710 : * Transform a column list (represented by an array Datum) to a bitmapset.
711 : *
712 : * If columns isn't NULL, add the column numbers to that set.
713 : *
714 : * If mcxt isn't NULL, build the bitmapset in that context.
715 : */
716 : Bitmapset *
717 278 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
718 : {
719 278 : Bitmapset *result = columns;
720 : ArrayType *arr;
721 : int nelems;
722 : int16 *elems;
723 278 : MemoryContext oldcxt = NULL;
724 :
725 278 : arr = DatumGetArrayTypeP(pubcols);
726 278 : nelems = ARR_DIMS(arr)[0];
727 278 : elems = (int16 *) ARR_DATA_PTR(arr);
728 :
729 : /* If a memory context was specified, switch to it. */
730 278 : if (mcxt)
731 39 : oldcxt = MemoryContextSwitchTo(mcxt);
732 :
733 764 : for (int i = 0; i < nelems; i++)
734 486 : result = bms_add_member(result, elems[i]);
735 :
736 278 : if (mcxt)
737 39 : MemoryContextSwitchTo(oldcxt);
738 :
739 278 : return result;
740 : }
741 :
742 : /*
743 : * Returns a bitmap representing the columns of the specified table.
744 : *
745 : * Generated columns are included if include_gencols_type is
746 : * PUBLISH_GENCOLS_STORED.
747 : */
748 : Bitmapset *
749 9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
750 : {
751 9 : Bitmapset *result = NULL;
752 9 : TupleDesc desc = RelationGetDescr(relation);
753 :
754 30 : for (int i = 0; i < desc->natts; i++)
755 : {
756 21 : Form_pg_attribute att = TupleDescAttr(desc, i);
757 :
758 21 : if (att->attisdropped)
759 1 : continue;
760 :
761 20 : if (att->attgenerated)
762 : {
763 : /* We only support replication of STORED generated cols. */
764 2 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
765 1 : continue;
766 :
767 : /* User hasn't requested to replicate STORED generated cols. */
768 1 : if (include_gencols_type != PUBLISH_GENCOLS_STORED)
769 1 : continue;
770 : }
771 :
772 18 : result = bms_add_member(result, att->attnum);
773 : }
774 :
775 9 : return result;
776 : }
777 :
778 : /*
779 : * Insert new publication / schema mapping.
780 : */
781 : ObjectAddress
782 175 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
783 : {
784 : Relation rel;
785 : HeapTuple tup;
786 : Datum values[Natts_pg_publication_namespace];
787 : bool nulls[Natts_pg_publication_namespace];
788 : Oid psschid;
789 175 : Publication *pub = GetPublication(pubid);
790 175 : List *schemaRels = NIL;
791 : ObjectAddress myself,
792 : referenced;
793 :
794 175 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
795 :
796 : /*
797 : * Check for duplicates. Note that this does not really prevent
798 : * duplicates, it's here just to provide nicer error message in common
799 : * case. The real protection is the unique key on the catalog.
800 : */
801 175 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
802 : ObjectIdGetDatum(schemaid),
803 : ObjectIdGetDatum(pubid)))
804 : {
805 12 : table_close(rel, RowExclusiveLock);
806 :
807 12 : if (if_not_exists)
808 8 : return InvalidObjectAddress;
809 :
810 4 : ereport(ERROR,
811 : (errcode(ERRCODE_DUPLICATE_OBJECT),
812 : errmsg("schema \"%s\" is already member of publication \"%s\"",
813 : get_namespace_name(schemaid), pub->name)));
814 : }
815 :
816 163 : check_publication_add_schema(schemaid);
817 :
818 : /* Form a tuple */
819 159 : memset(values, 0, sizeof(values));
820 159 : memset(nulls, false, sizeof(nulls));
821 :
822 159 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
823 : Anum_pg_publication_namespace_oid);
824 159 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
825 159 : values[Anum_pg_publication_namespace_pnpubid - 1] =
826 159 : ObjectIdGetDatum(pubid);
827 159 : values[Anum_pg_publication_namespace_pnnspid - 1] =
828 159 : ObjectIdGetDatum(schemaid);
829 :
830 159 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
831 :
832 : /* Insert tuple into catalog */
833 159 : CatalogTupleInsert(rel, tup);
834 159 : heap_freetuple(tup);
835 :
836 159 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
837 :
838 : /* Add dependency on the publication */
839 159 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
840 159 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
841 :
842 : /* Add dependency on the schema */
843 159 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
844 159 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
845 :
846 : /* Close the table */
847 159 : table_close(rel, RowExclusiveLock);
848 :
849 : /*
850 : * Invalidate relcache so that publication info is rebuilt. See
851 : * publication_add_relation for why we need to consider all the
852 : * partitions.
853 : */
854 159 : schemaRels = GetSchemaPublicationRelations(schemaid,
855 : PUBLICATION_PART_ALL);
856 159 : InvalidatePublicationRels(schemaRels);
857 :
858 159 : return myself;
859 : }
860 :
861 : /*
862 : * Internal function to get the list of publication oids for a relation.
863 : *
864 : * If except_flag is true, returns the list of publication that specified the
865 : * relation in the EXCEPT clause; otherwise, returns the list of publications
866 : * in which relation is included.
867 : */
868 : static List *
869 16583 : get_relation_publications(Oid relid, bool except_flag)
870 : {
871 16583 : List *result = NIL;
872 : CatCList *pubrellist;
873 :
874 : /* Find all publications associated with the relation. */
875 16583 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
876 : ObjectIdGetDatum(relid));
877 18235 : for (int i = 0; i < pubrellist->n_members; i++)
878 : {
879 1652 : HeapTuple tup = &pubrellist->members[i]->tuple;
880 1652 : Form_pg_publication_rel pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
881 1652 : Oid pubid = pubrel->prpubid;
882 :
883 1652 : if (pubrel->prexcept == except_flag)
884 1181 : result = lappend_oid(result, pubid);
885 : }
886 :
887 16583 : ReleaseSysCacheList(pubrellist);
888 :
889 16583 : return result;
890 : }
891 :
892 : /*
893 : * Gets list of publication oids for a relation.
894 : */
895 : List *
896 8941 : GetRelationIncludedPublications(Oid relid)
897 : {
898 8941 : return get_relation_publications(relid, false);
899 : }
900 :
901 : /*
902 : * Gets list of publication oids which has relation in the EXCEPT clause.
903 : */
904 : List *
905 7642 : GetRelationExcludedPublications(Oid relid)
906 : {
907 7642 : return get_relation_publications(relid, true);
908 : }
909 :
910 : /*
911 : * Internal function to get the list of relation oids for a publication.
912 : *
913 : * If except_flag is true, returns the list of relations specified in the
914 : * EXCEPT clause of the publication; otherwise, returns the list of relations
915 : * included in the publication.
916 : */
917 : static List *
918 680 : get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt,
919 : bool except_flag)
920 : {
921 : List *result;
922 : Relation pubrelsrel;
923 : ScanKeyData scankey;
924 : SysScanDesc scan;
925 : HeapTuple tup;
926 :
927 : /* Find all relations associated with the publication. */
928 680 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
929 :
930 680 : ScanKeyInit(&scankey,
931 : Anum_pg_publication_rel_prpubid,
932 : BTEqualStrategyNumber, F_OIDEQ,
933 : ObjectIdGetDatum(pubid));
934 :
935 680 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
936 : true, NULL, 1, &scankey);
937 :
938 680 : result = NIL;
939 2001 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
940 : {
941 : Form_pg_publication_rel pubrel;
942 :
943 641 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
944 :
945 641 : if (except_flag == pubrel->prexcept)
946 641 : result = GetPubPartitionOptionRelations(result, pub_partopt,
947 : pubrel->prrelid);
948 : }
949 :
950 680 : systable_endscan(scan);
951 680 : table_close(pubrelsrel, AccessShareLock);
952 :
953 : /* Now sort and de-duplicate the result list */
954 680 : list_sort(result, list_oid_cmp);
955 680 : list_deduplicate_oid(result);
956 :
957 680 : return result;
958 : }
959 :
960 : /*
961 : * Gets list of relation oids that are associated with a publication.
962 : *
963 : * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
964 : * should use GetAllPublicationRelations().
965 : */
966 : List *
967 610 : GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
968 : {
969 : Assert(!GetPublication(pubid)->alltables);
970 :
971 610 : return get_publication_relations(pubid, pub_partopt, false);
972 : }
973 :
974 : /*
975 : * Gets list of table oids that were specified in the EXCEPT clause for a
976 : * publication.
977 : *
978 : * This should only be used FOR ALL TABLES publications.
979 : */
980 : List *
981 70 : GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
982 : {
983 : Assert(GetPublication(pubid)->alltables);
984 :
985 70 : return get_publication_relations(pubid, pub_partopt, true);
986 : }
987 :
988 : /*
989 : * Gets list of publication oids for publications marked as FOR ALL TABLES.
990 : */
991 : List *
992 5947 : GetAllTablesPublications(void)
993 : {
994 : List *result;
995 : Relation rel;
996 : ScanKeyData scankey;
997 : SysScanDesc scan;
998 : HeapTuple tup;
999 :
1000 : /* Find all publications that are marked as for all tables. */
1001 5947 : rel = table_open(PublicationRelationId, AccessShareLock);
1002 :
1003 5947 : ScanKeyInit(&scankey,
1004 : Anum_pg_publication_puballtables,
1005 : BTEqualStrategyNumber, F_BOOLEQ,
1006 : BoolGetDatum(true));
1007 :
1008 5947 : scan = systable_beginscan(rel, InvalidOid, false,
1009 : NULL, 1, &scankey);
1010 :
1011 5947 : result = NIL;
1012 6077 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
1013 : {
1014 130 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
1015 :
1016 130 : result = lappend_oid(result, oid);
1017 : }
1018 :
1019 5947 : systable_endscan(scan);
1020 5947 : table_close(rel, AccessShareLock);
1021 :
1022 5947 : return result;
1023 : }
1024 :
1025 : /*
1026 : * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
1027 : * publication.
1028 : *
1029 : * If the publication publishes partition changes via their respective root
1030 : * partitioned tables, we must exclude partitions in favor of including the
1031 : * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
1032 : * publication.
1033 : *
1034 : * For a FOR ALL TABLES publication, the returned list excludes tables mentioned
1035 : * in the EXCEPT clause.
1036 : */
1037 : List *
1038 55 : GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
1039 : {
1040 : Relation classRel;
1041 : ScanKeyData key[1];
1042 : TableScanDesc scan;
1043 : HeapTuple tuple;
1044 55 : List *result = NIL;
1045 55 : List *exceptlist = NIL;
1046 :
1047 : Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
1048 :
1049 : /* EXCEPT filtering applies only to relations, not sequences */
1050 55 : if (relkind == RELKIND_RELATION)
1051 49 : exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ?
1052 49 : PUBLICATION_PART_ROOT :
1053 : PUBLICATION_PART_LEAF);
1054 :
1055 55 : classRel = table_open(RelationRelationId, AccessShareLock);
1056 :
1057 55 : ScanKeyInit(&key[0],
1058 : Anum_pg_class_relkind,
1059 : BTEqualStrategyNumber, F_CHAREQ,
1060 : CharGetDatum(relkind));
1061 :
1062 55 : scan = table_beginscan_catalog(classRel, 1, key);
1063 :
1064 3769 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1065 : {
1066 3714 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1067 3714 : Oid relid = relForm->oid;
1068 :
1069 3714 : if (is_publishable_class(relid, relForm) &&
1070 135 : !(relForm->relispartition && pubviaroot) &&
1071 112 : !list_member_oid(exceptlist, relid))
1072 104 : result = lappend_oid(result, relid);
1073 : }
1074 :
1075 55 : table_endscan(scan);
1076 :
1077 55 : if (pubviaroot)
1078 : {
1079 4 : ScanKeyInit(&key[0],
1080 : Anum_pg_class_relkind,
1081 : BTEqualStrategyNumber, F_CHAREQ,
1082 : CharGetDatum(RELKIND_PARTITIONED_TABLE));
1083 :
1084 4 : scan = table_beginscan_catalog(classRel, 1, key);
1085 :
1086 21 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1087 : {
1088 17 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1089 17 : Oid relid = relForm->oid;
1090 :
1091 17 : if (is_publishable_class(relid, relForm) &&
1092 17 : !relForm->relispartition &&
1093 13 : !list_member_oid(exceptlist, relid))
1094 12 : result = lappend_oid(result, relid);
1095 : }
1096 :
1097 4 : table_endscan(scan);
1098 : }
1099 :
1100 55 : table_close(classRel, AccessShareLock);
1101 55 : return result;
1102 : }
1103 :
1104 : /*
1105 : * Gets the list of schema oids for a publication.
1106 : *
1107 : * This should only be used FOR TABLES IN SCHEMA publications.
1108 : */
1109 : List *
1110 576 : GetPublicationSchemas(Oid pubid)
1111 : {
1112 576 : List *result = NIL;
1113 : Relation pubschsrel;
1114 : ScanKeyData scankey;
1115 : SysScanDesc scan;
1116 : HeapTuple tup;
1117 :
1118 : /* Find all schemas associated with the publication */
1119 576 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
1120 :
1121 576 : ScanKeyInit(&scankey,
1122 : Anum_pg_publication_namespace_pnpubid,
1123 : BTEqualStrategyNumber, F_OIDEQ,
1124 : ObjectIdGetDatum(pubid));
1125 :
1126 576 : scan = systable_beginscan(pubschsrel,
1127 : PublicationNamespacePnnspidPnpubidIndexId,
1128 : true, NULL, 1, &scankey);
1129 616 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
1130 : {
1131 : Form_pg_publication_namespace pubsch;
1132 :
1133 40 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1134 :
1135 40 : result = lappend_oid(result, pubsch->pnnspid);
1136 : }
1137 :
1138 576 : systable_endscan(scan);
1139 576 : table_close(pubschsrel, AccessShareLock);
1140 :
1141 576 : return result;
1142 : }
1143 :
1144 : /*
1145 : * Gets the list of publication oids associated with a specified schema.
1146 : */
1147 : List *
1148 8609 : GetSchemaPublications(Oid schemaid)
1149 : {
1150 8609 : List *result = NIL;
1151 : CatCList *pubschlist;
1152 : int i;
1153 :
1154 : /* Find all publications associated with the schema */
1155 8609 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
1156 : ObjectIdGetDatum(schemaid));
1157 8688 : for (i = 0; i < pubschlist->n_members; i++)
1158 : {
1159 79 : HeapTuple tup = &pubschlist->members[i]->tuple;
1160 79 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
1161 :
1162 79 : result = lappend_oid(result, pubid);
1163 : }
1164 :
1165 8609 : ReleaseSysCacheList(pubschlist);
1166 :
1167 8609 : return result;
1168 : }
1169 :
1170 : /*
1171 : * Get the list of publishable relation oids for a specified schema.
1172 : */
1173 : List *
1174 310 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
1175 : {
1176 : Relation classRel;
1177 : ScanKeyData key[1];
1178 : TableScanDesc scan;
1179 : HeapTuple tuple;
1180 310 : List *result = NIL;
1181 :
1182 : Assert(OidIsValid(schemaid));
1183 :
1184 310 : classRel = table_open(RelationRelationId, AccessShareLock);
1185 :
1186 310 : ScanKeyInit(&key[0],
1187 : Anum_pg_class_relnamespace,
1188 : BTEqualStrategyNumber, F_OIDEQ,
1189 : ObjectIdGetDatum(schemaid));
1190 :
1191 : /* get all the relations present in the specified schema */
1192 310 : scan = table_beginscan_catalog(classRel, 1, key);
1193 17256 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1194 : {
1195 16946 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1196 16946 : Oid relid = relForm->oid;
1197 : char relkind;
1198 :
1199 16946 : if (!is_publishable_class(relid, relForm))
1200 5839 : continue;
1201 :
1202 11107 : relkind = get_rel_relkind(relid);
1203 11107 : if (relkind == RELKIND_RELATION)
1204 9554 : result = lappend_oid(result, relid);
1205 1553 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1206 : {
1207 495 : List *partitionrels = NIL;
1208 :
1209 : /*
1210 : * It is quite possible that some of the partitions are in a
1211 : * different schema than the parent table, so we need to get such
1212 : * partitions separately.
1213 : */
1214 495 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1215 : pub_partopt,
1216 : relForm->oid);
1217 495 : result = list_concat_unique_oid(result, partitionrels);
1218 : }
1219 : }
1220 :
1221 310 : table_endscan(scan);
1222 310 : table_close(classRel, AccessShareLock);
1223 310 : return result;
1224 : }
1225 :
1226 : /*
1227 : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1228 : * publication.
1229 : */
1230 : List *
1231 283 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1232 : {
1233 283 : List *result = NIL;
1234 283 : List *pubschemalist = GetPublicationSchemas(pubid);
1235 : ListCell *cell;
1236 :
1237 303 : foreach(cell, pubschemalist)
1238 : {
1239 20 : Oid schemaid = lfirst_oid(cell);
1240 20 : List *schemaRels = NIL;
1241 :
1242 20 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1243 20 : result = list_concat(result, schemaRels);
1244 : }
1245 :
1246 283 : return result;
1247 : }
1248 :
1249 : /*
1250 : * Get publication using oid
1251 : *
1252 : * The Publication struct and its data are palloc'ed here.
1253 : */
1254 : Publication *
1255 4198 : GetPublication(Oid pubid)
1256 : {
1257 : HeapTuple tup;
1258 : Publication *pub;
1259 : Form_pg_publication pubform;
1260 :
1261 4198 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1262 4198 : if (!HeapTupleIsValid(tup))
1263 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1264 :
1265 4198 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1266 :
1267 4198 : pub = palloc_object(Publication);
1268 4198 : pub->oid = pubid;
1269 4198 : pub->name = pstrdup(NameStr(pubform->pubname));
1270 4198 : pub->alltables = pubform->puballtables;
1271 4198 : pub->allsequences = pubform->puballsequences;
1272 4198 : pub->pubactions.pubinsert = pubform->pubinsert;
1273 4198 : pub->pubactions.pubupdate = pubform->pubupdate;
1274 4198 : pub->pubactions.pubdelete = pubform->pubdelete;
1275 4198 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1276 4198 : pub->pubviaroot = pubform->pubviaroot;
1277 4198 : pub->pubgencols_type = pubform->pubgencols;
1278 :
1279 4198 : ReleaseSysCache(tup);
1280 :
1281 4198 : return pub;
1282 : }
1283 :
1284 : /*
1285 : * Get Publication using name.
1286 : */
1287 : Publication *
1288 1792 : GetPublicationByName(const char *pubname, bool missing_ok)
1289 : {
1290 : Oid oid;
1291 :
1292 1792 : oid = get_publication_oid(pubname, missing_ok);
1293 :
1294 1792 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1295 : }
1296 :
1297 : /*
1298 : * A helper function for pg_get_publication_tables() to check whether the
1299 : * table with the given relid is published in the specified publication.
1300 : *
1301 : * This function evaluates the effective published OID based on the
1302 : * publish_via_partition_root setting, rather than just checking catalog entries
1303 : * (e.g., pg_publication_rel). For instance, when publish_via_partition_root is
1304 : * false, it returns false for a parent partitioned table and returns true
1305 : * for its leaf partitions, even if the parent is the one explicitly added
1306 : * to the publication.
1307 : *
1308 : * For performance reasons, this function avoids the overhead of constructing
1309 : * the complete list of published tables during the evaluation. It can execute
1310 : * quickly even when the publication contains a large number of relations.
1311 : *
1312 : * Note: this leaks memory for the ancestors list into the current memory
1313 : * context.
1314 : */
1315 : static bool
1316 987 : is_table_publishable_in_publication(Oid relid, Publication *pub)
1317 : {
1318 : bool relispartition;
1319 987 : List *ancestors = NIL;
1320 :
1321 : /*
1322 : * For non-pubviaroot publications, a partitioned table is never the
1323 : * effective published OID; only its leaf partitions can be.
1324 : */
1325 987 : if (!pub->pubviaroot && get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE)
1326 86 : return false;
1327 :
1328 901 : relispartition = get_rel_relispartition(relid);
1329 :
1330 901 : if (relispartition)
1331 171 : ancestors = get_partition_ancestors(relid);
1332 :
1333 901 : if (pub->alltables)
1334 : {
1335 : /*
1336 : * ALL TABLES with pubviaroot includes only regular tables or top-most
1337 : * partitioned tables -- never child partitions.
1338 : */
1339 202 : if (pub->pubviaroot && relispartition)
1340 12 : return false;
1341 :
1342 : /*
1343 : * For ALL TABLES publications, the table is published unless it
1344 : * appears in the EXCEPT clause. Only the top-most can appear in the
1345 : * EXCEPT clause, so exclusion must be evaluated at the top-most
1346 : * ancestor if it has. These publications store only EXCEPT'ed tables
1347 : * in pg_publication_rel, so checking existence is sufficient.
1348 : *
1349 : * Note that this existence check below would incorrectly return true
1350 : * (published) for partitions when pubviaroot is enabled; however,
1351 : * that case is already caught and returned false by the above check.
1352 : */
1353 190 : return !SearchSysCacheExists2(PUBLICATIONRELMAP,
1354 : ObjectIdGetDatum(ancestors
1355 : ? llast_oid(ancestors) : relid),
1356 : ObjectIdGetDatum(pub->oid));
1357 : }
1358 :
1359 : /*
1360 : * Non-ALL-TABLE publication cases.
1361 : *
1362 : * A table is published if it (or a containing schema) was explicitly
1363 : * added, or if it is a partition whose ancestor was added.
1364 : */
1365 :
1366 : /*
1367 : * If an ancestor is published, the partition's status depends on
1368 : * publish_via_partition_root value.
1369 : *
1370 : * If it's true, the ancestor's relation OID is the effective published
1371 : * OID, so the partition itself should be excluded (return false).
1372 : *
1373 : * If it's false, the partition is covered by its ancestor's presence in
1374 : * the publication, it should be included (return true).
1375 : */
1376 840 : if (relispartition &&
1377 141 : OidIsValid(GetTopMostAncestorInPublication(pub->oid, ancestors, NULL)))
1378 44 : return !pub->pubviaroot;
1379 :
1380 : /*
1381 : * Check whether the table is explicitly published via pg_publication_rel
1382 : * or pg_publication_namespace.
1383 : */
1384 655 : return (SearchSysCacheExists2(PUBLICATIONRELMAP,
1385 : ObjectIdGetDatum(relid),
1386 1025 : ObjectIdGetDatum(pub->oid)) ||
1387 370 : SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1388 : ObjectIdGetDatum(get_rel_namespace(relid)),
1389 : ObjectIdGetDatum(pub->oid)));
1390 : }
1391 :
1392 : /*
1393 : * Helper function to get information of the tables in the given
1394 : * publication(s).
1395 : *
1396 : * If filter_by_relid is true, only the row(s) for target_relid is returned;
1397 : * if target_relid does not exist or is not part of the publications, zero
1398 : * rows are returned. If filter_by_relid is false, rows for all tables
1399 : * within the specified publications are returned and target_relid is
1400 : * ignored.
1401 : *
1402 : * Returns pubid, relid, column list, and row filter for each table.
1403 : */
1404 : static Datum
1405 1602 : pg_get_publication_tables(FunctionCallInfo fcinfo, ArrayType *pubnames,
1406 : Oid target_relid, bool filter_by_relid,
1407 : bool pub_missing_ok)
1408 : {
1409 : #define NUM_PUBLICATION_TABLES_ELEM 4
1410 : FuncCallContext *funcctx;
1411 1602 : List *table_infos = NIL;
1412 :
1413 : /* stuff done only on the first call of the function */
1414 1602 : if (SRF_IS_FIRSTCALL())
1415 : {
1416 : TupleDesc tupdesc;
1417 : MemoryContext oldcontext;
1418 : Datum *elems;
1419 : int nelems,
1420 : i;
1421 755 : bool viaroot = false;
1422 :
1423 : /* create a function context for cross-call persistence */
1424 755 : funcctx = SRF_FIRSTCALL_INIT();
1425 :
1426 : /*
1427 : * Preliminary check if the specified table can be published in the
1428 : * first place. If not, we can return early without checking the given
1429 : * publications and the table.
1430 : */
1431 755 : if (filter_by_relid && !is_publishable_table(target_relid))
1432 8 : SRF_RETURN_DONE(funcctx);
1433 :
1434 : /* switch to memory context appropriate for multiple function calls */
1435 747 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1436 :
1437 : /*
1438 : * Deconstruct the parameter into elements where each element is a
1439 : * publication name.
1440 : */
1441 747 : deconstruct_array_builtin(pubnames, TEXTOID, &elems, NULL, &nelems);
1442 :
1443 : /* Get Oids of tables from each publication. */
1444 1992 : for (i = 0; i < nelems; i++)
1445 : {
1446 : Publication *pub_elem;
1447 1245 : List *pub_elem_tables = NIL;
1448 : ListCell *lc;
1449 :
1450 1245 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]),
1451 : pub_missing_ok);
1452 :
1453 1245 : if (pub_elem == NULL)
1454 6 : continue;
1455 :
1456 1239 : if (filter_by_relid)
1457 : {
1458 : /* Check if the given table is published for the publication */
1459 987 : if (is_table_publishable_in_publication(target_relid, pub_elem))
1460 : {
1461 501 : pub_elem_tables = list_make1_oid(target_relid);
1462 : }
1463 : }
1464 : else
1465 : {
1466 : /*
1467 : * Publications support partitioned tables. If
1468 : * publish_via_partition_root is false, all changes are
1469 : * replicated using leaf partition identity and schema, so we
1470 : * only need those. Otherwise, get the partitioned table
1471 : * itself.
1472 : */
1473 252 : if (pub_elem->alltables)
1474 49 : pub_elem_tables = GetAllPublicationRelations(pub_elem->oid,
1475 : RELKIND_RELATION,
1476 49 : pub_elem->pubviaroot);
1477 : else
1478 : {
1479 : List *relids,
1480 : *schemarelids;
1481 :
1482 203 : relids = GetIncludedPublicationRelations(pub_elem->oid,
1483 203 : pub_elem->pubviaroot ?
1484 203 : PUBLICATION_PART_ROOT :
1485 : PUBLICATION_PART_LEAF);
1486 203 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1487 203 : pub_elem->pubviaroot ?
1488 203 : PUBLICATION_PART_ROOT :
1489 : PUBLICATION_PART_LEAF);
1490 203 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1491 : }
1492 : }
1493 :
1494 : /*
1495 : * Record the published table and the corresponding publication so
1496 : * that we can get row filters and column lists later.
1497 : *
1498 : * When a table is published by multiple publications, to obtain
1499 : * all row filters and column lists, the structure related to this
1500 : * table will be recorded multiple times.
1501 : */
1502 2116 : foreach(lc, pub_elem_tables)
1503 : {
1504 877 : published_rel *table_info = palloc_object(published_rel);
1505 :
1506 877 : table_info->relid = lfirst_oid(lc);
1507 877 : table_info->pubid = pub_elem->oid;
1508 877 : table_infos = lappend(table_infos, table_info);
1509 : }
1510 :
1511 : /* At least one publication is using publish_via_partition_root. */
1512 1239 : if (pub_elem->pubviaroot)
1513 249 : viaroot = true;
1514 : }
1515 :
1516 : /*
1517 : * If the publication publishes partition changes via their respective
1518 : * root partitioned tables, we must exclude partitions in favor of
1519 : * including the root partitioned tables. Otherwise, the function
1520 : * could return both the child and parent tables which could cause
1521 : * data of the child table to be double-published on the subscriber
1522 : * side.
1523 : */
1524 747 : if (viaroot)
1525 169 : filter_partitions(table_infos);
1526 :
1527 : /* Construct a tuple descriptor for the result rows. */
1528 747 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1529 747 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1530 : OIDOID, -1, 0);
1531 747 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1532 : OIDOID, -1, 0);
1533 747 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1534 : INT2VECTOROID, -1, 0);
1535 747 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1536 : PG_NODE_TREEOID, -1, 0);
1537 :
1538 747 : TupleDescFinalize(tupdesc);
1539 747 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1540 747 : funcctx->user_fctx = table_infos;
1541 :
1542 747 : MemoryContextSwitchTo(oldcontext);
1543 : }
1544 :
1545 : /* stuff done on every call of the function */
1546 1594 : funcctx = SRF_PERCALL_SETUP();
1547 1594 : table_infos = (List *) funcctx->user_fctx;
1548 :
1549 1594 : if (funcctx->call_cntr < list_length(table_infos))
1550 : {
1551 847 : HeapTuple pubtuple = NULL;
1552 : HeapTuple rettuple;
1553 : Publication *pub;
1554 847 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1555 847 : Oid relid = table_info->relid;
1556 847 : Oid schemaid = get_rel_namespace(relid);
1557 847 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1558 847 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1559 :
1560 : /*
1561 : * Form tuple with appropriate data.
1562 : */
1563 :
1564 847 : pub = GetPublication(table_info->pubid);
1565 :
1566 847 : values[0] = ObjectIdGetDatum(pub->oid);
1567 847 : values[1] = ObjectIdGetDatum(relid);
1568 :
1569 : /*
1570 : * We don't consider row filters or column lists for FOR ALL TABLES or
1571 : * FOR TABLES IN SCHEMA publications.
1572 : */
1573 847 : if (!pub->alltables &&
1574 576 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1575 : ObjectIdGetDatum(schemaid),
1576 : ObjectIdGetDatum(pub->oid)))
1577 540 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1578 : ObjectIdGetDatum(relid),
1579 : ObjectIdGetDatum(pub->oid));
1580 :
1581 847 : if (HeapTupleIsValid(pubtuple))
1582 : {
1583 : /* Lookup the column list attribute. */
1584 501 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1585 : Anum_pg_publication_rel_prattrs,
1586 : &(nulls[2]));
1587 :
1588 : /* Null indicates no filter. */
1589 501 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1590 : Anum_pg_publication_rel_prqual,
1591 : &(nulls[3]));
1592 : }
1593 : else
1594 : {
1595 346 : nulls[2] = true;
1596 346 : nulls[3] = true;
1597 : }
1598 :
1599 : /* Show all columns when the column list is not specified. */
1600 847 : if (nulls[2])
1601 : {
1602 751 : Relation rel = table_open(relid, AccessShareLock);
1603 751 : int nattnums = 0;
1604 : int16 *attnums;
1605 751 : TupleDesc desc = RelationGetDescr(rel);
1606 : int i;
1607 :
1608 751 : attnums = palloc_array(int16, desc->natts);
1609 :
1610 1984 : for (i = 0; i < desc->natts; i++)
1611 : {
1612 1233 : Form_pg_attribute att = TupleDescAttr(desc, i);
1613 :
1614 1233 : if (att->attisdropped)
1615 4 : continue;
1616 :
1617 1229 : if (att->attgenerated)
1618 : {
1619 : /* We only support replication of STORED generated cols. */
1620 22 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1621 10 : continue;
1622 :
1623 : /*
1624 : * User hasn't requested to replicate STORED generated
1625 : * cols.
1626 : */
1627 12 : if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
1628 9 : continue;
1629 : }
1630 :
1631 1210 : attnums[nattnums++] = att->attnum;
1632 : }
1633 :
1634 751 : if (nattnums > 0)
1635 : {
1636 747 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1637 747 : nulls[2] = false;
1638 : }
1639 :
1640 751 : table_close(rel, AccessShareLock);
1641 : }
1642 :
1643 847 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1644 :
1645 847 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1646 : }
1647 :
1648 747 : SRF_RETURN_DONE(funcctx);
1649 : }
1650 :
1651 : Datum
1652 550 : pg_get_publication_tables_a(PG_FUNCTION_ARGS)
1653 : {
1654 : /*
1655 : * Get information for all tables in the given publications.
1656 : * filter_by_relid is false so all tables are returned; pub_missing_ok is
1657 : * false for backward compatibility.
1658 : */
1659 550 : return pg_get_publication_tables(fcinfo, PG_GETARG_ARRAYTYPE_P(0),
1660 : InvalidOid, false, false);
1661 : }
1662 :
1663 : Datum
1664 1052 : pg_get_publication_tables_b(PG_FUNCTION_ARGS)
1665 : {
1666 : /*
1667 : * Get information for the specified table in the given publications. The
1668 : * SQL-level function is declared STRICT, so target_relid is guaranteed to
1669 : * be non-NULL here.
1670 : */
1671 1052 : return pg_get_publication_tables(fcinfo, PG_GETARG_ARRAYTYPE_P(0),
1672 : PG_GETARG_OID(1), true, true);
1673 : }
1674 :
1675 : /*
1676 : * Returns Oids of sequences in a publication.
1677 : */
1678 : Datum
1679 234 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
1680 : {
1681 : FuncCallContext *funcctx;
1682 234 : List *sequences = NIL;
1683 :
1684 : /* stuff done only on the first call of the function */
1685 234 : if (SRF_IS_FIRSTCALL())
1686 : {
1687 217 : char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1688 : Publication *publication;
1689 : MemoryContext oldcontext;
1690 :
1691 : /* create a function context for cross-call persistence */
1692 217 : funcctx = SRF_FIRSTCALL_INIT();
1693 :
1694 : /* switch to memory context appropriate for multiple function calls */
1695 217 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1696 :
1697 217 : publication = GetPublicationByName(pubname, false);
1698 :
1699 217 : if (publication->allsequences)
1700 6 : sequences = GetAllPublicationRelations(publication->oid,
1701 : RELKIND_SEQUENCE,
1702 : false);
1703 :
1704 217 : funcctx->user_fctx = sequences;
1705 :
1706 217 : MemoryContextSwitchTo(oldcontext);
1707 : }
1708 :
1709 : /* stuff done on every call of the function */
1710 234 : funcctx = SRF_PERCALL_SETUP();
1711 234 : sequences = (List *) funcctx->user_fctx;
1712 :
1713 234 : if (funcctx->call_cntr < list_length(sequences))
1714 : {
1715 17 : Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1716 :
1717 17 : SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
1718 : }
1719 :
1720 217 : SRF_RETURN_DONE(funcctx);
1721 : }
|