Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_publication.c
4 : * publication C API manipulation
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_publication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/catalog.h"
22 : #include "catalog/dependency.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/partition.h"
27 : #include "catalog/pg_inherits.h"
28 : #include "catalog/pg_namespace.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/publicationcmds.h"
34 : #include "funcapi.h"
35 : #include "utils/array.h"
36 : #include "utils/builtins.h"
37 : #include "utils/catcache.h"
38 : #include "utils/fmgroids.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/syscache.h"
42 :
43 : /* Records association between publication and published table */
44 : typedef struct
45 : {
46 : Oid relid; /* OID of published table */
47 : Oid pubid; /* OID of publication that publishes this
48 : * table. */
49 : } published_rel;
50 :
51 : /*
52 : * Check if relation can be in given publication and throws appropriate
53 : * error if not.
54 : */
55 : static void
56 808 : check_publication_add_relation(PublicationRelInfo *pri)
57 : {
58 808 : Relation targetrel = pri->relation;
59 : const char *relname;
60 : const char *errormsg;
61 :
62 808 : if (pri->except)
63 : {
64 81 : relname = RelationGetQualifiedRelationName(targetrel);
65 81 : errormsg = gettext_noop("cannot specify relation \"%s\" in the publication EXCEPT clause");
66 : }
67 : else
68 : {
69 727 : relname = RelationGetRelationName(targetrel);
70 727 : errormsg = gettext_noop("cannot add relation \"%s\" to publication");
71 : }
72 :
73 : /* If in EXCEPT clause, must be root partitioned table */
74 808 : if (pri->except && targetrel->rd_rel->relispartition)
75 4 : ereport(ERROR,
76 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
77 : errmsg(errormsg, relname),
78 : errdetail("This operation is not supported for individual partitions.")));
79 :
80 : /* Must be a regular or partitioned table */
81 804 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
82 115 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
83 9 : ereport(ERROR,
84 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : errmsg(errormsg, relname),
86 : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
87 :
88 : /* Can't be system table */
89 795 : if (IsCatalogRelation(targetrel))
90 4 : ereport(ERROR,
91 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
92 : errmsg(errormsg, relname),
93 : errdetail("This operation is not supported for system tables.")));
94 :
95 : /* UNLOGGED and TEMP relations cannot be part of publication. */
96 791 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
97 4 : ereport(ERROR,
98 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
99 : errmsg(errormsg, relname),
100 : errdetail("This operation is not supported for temporary tables.")));
101 787 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
102 4 : ereport(ERROR,
103 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
104 : errmsg(errormsg, relname),
105 : errdetail("This operation is not supported for unlogged tables.")));
106 783 : }
107 :
108 : /*
109 : * Check if schema can be in given publication and throw appropriate error if
110 : * not.
111 : */
112 : static void
113 163 : check_publication_add_schema(Oid schemaid)
114 : {
115 : /* Can't be system namespace */
116 163 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
117 4 : ereport(ERROR,
118 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
119 : errmsg("cannot add schema \"%s\" to publication",
120 : get_namespace_name(schemaid)),
121 : errdetail("This operation is not supported for system schemas.")));
122 :
123 : /* Can't be temporary namespace */
124 159 : if (isAnyTempNamespace(schemaid))
125 0 : ereport(ERROR,
126 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
127 : errmsg("cannot add schema \"%s\" to publication",
128 : get_namespace_name(schemaid)),
129 : errdetail("Temporary schemas cannot be replicated.")));
130 159 : }
131 :
132 : /*
133 : * Returns if relation represented by oid and Form_pg_class entry
134 : * is publishable.
135 : *
136 : * Does same checks as check_publication_add_relation() above except for
137 : * RELKIND_SEQUENCE, but does not need relation to be opened and also does
138 : * not throw errors. Here, the additional check is to support ALL SEQUENCES
139 : * publication.
140 : *
141 : * XXX This also excludes all tables with relid < FirstNormalObjectId,
142 : * ie all tables created during initdb. This mainly affects the preinstalled
143 : * information_schema. IsCatalogRelationOid() only excludes tables with
144 : * relid < FirstUnpinnedObjectId, making that test rather redundant,
145 : * but really we should get rid of the FirstNormalObjectId test not
146 : * IsCatalogRelationOid. We can't do so today because we don't want
147 : * information_schema tables to be considered publishable; but this test
148 : * is really inadequate for that, since the information_schema could be
149 : * dropped and reloaded and then it'll be considered publishable. The best
150 : * long-term solution may be to add a "relispublishable" bool to pg_class,
151 : * and depend on that instead of OID checks.
152 : */
153 : static bool
154 323172 : is_publishable_class(Oid relid, Form_pg_class reltuple)
155 : {
156 331444 : return (reltuple->relkind == RELKIND_RELATION ||
157 8272 : reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
158 7264 : reltuple->relkind == RELKIND_SEQUENCE) &&
159 317279 : !IsCatalogRelationOid(relid) &&
160 646344 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
161 : relid >= FirstNormalObjectId;
162 : }
163 :
164 : /*
165 : * Another variant of is_publishable_class(), taking a Relation.
166 : */
167 : bool
168 297657 : is_publishable_relation(Relation rel)
169 : {
170 297657 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
171 : }
172 :
173 : /*
174 : * Similar to is_publishable_class() but checks whether the given OID
175 : * is a publishable "table" or not.
176 : */
177 : static bool
178 554 : is_publishable_table(Oid tableoid)
179 : {
180 : HeapTuple tuple;
181 : Form_pg_class relform;
182 :
183 554 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(tableoid));
184 554 : if (!HeapTupleIsValid(tuple))
185 4 : return false;
186 :
187 550 : relform = (Form_pg_class) GETSTRUCT(tuple);
188 :
189 : /*
190 : * is_publishable_class() includes sequences, so we need to explicitly
191 : * check the relkind to filter them out here.
192 : */
193 1100 : if (relform->relkind != RELKIND_SEQUENCE &&
194 550 : is_publishable_class(tableoid, relform))
195 : {
196 546 : ReleaseSysCache(tuple);
197 546 : return true;
198 : }
199 :
200 4 : ReleaseSysCache(tuple);
201 4 : return false;
202 : }
203 :
204 : /*
205 : * SQL-callable variant of the above
206 : *
207 : * This returns null when the relation does not exist. This is intended to be
208 : * used for example in psql to avoid gratuitous errors when there are
209 : * concurrent catalog changes.
210 : */
211 : Datum
212 4288 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
213 : {
214 4288 : Oid relid = PG_GETARG_OID(0);
215 : HeapTuple tuple;
216 : bool result;
217 :
218 4288 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
219 4288 : if (!HeapTupleIsValid(tuple))
220 0 : PG_RETURN_NULL();
221 4288 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
222 4288 : ReleaseSysCache(tuple);
223 4288 : PG_RETURN_BOOL(result);
224 : }
225 :
226 : /*
227 : * Returns true if the ancestor is in the list of published relations.
228 : * Otherwise, returns false.
229 : */
230 : static bool
231 61 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
232 : {
233 : ListCell *lc;
234 :
235 278 : foreach(lc, table_infos)
236 : {
237 247 : Oid relid = ((published_rel *) lfirst(lc))->relid;
238 :
239 247 : if (relid == ancestor)
240 30 : return true;
241 : }
242 :
243 31 : return false;
244 : }
245 :
246 : /*
247 : * Filter out the partitions whose parent tables are also present in the list.
248 : */
249 : static void
250 169 : filter_partitions(List *table_infos)
251 : {
252 : ListCell *lc;
253 :
254 399 : foreach(lc, table_infos)
255 : {
256 230 : bool skip = false;
257 230 : List *ancestors = NIL;
258 : ListCell *lc2;
259 230 : published_rel *table_info = (published_rel *) lfirst(lc);
260 :
261 230 : if (get_rel_relispartition(table_info->relid))
262 61 : ancestors = get_partition_ancestors(table_info->relid);
263 :
264 261 : foreach(lc2, ancestors)
265 : {
266 61 : Oid ancestor = lfirst_oid(lc2);
267 :
268 61 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
269 : {
270 30 : skip = true;
271 30 : break;
272 : }
273 : }
274 :
275 230 : if (skip)
276 30 : table_infos = foreach_delete_current(table_infos, lc);
277 : }
278 169 : }
279 :
280 : /*
281 : * Returns true if any schema is associated with the publication, false if no
282 : * schema is associated with the publication.
283 : */
284 : bool
285 215 : is_schema_publication(Oid pubid)
286 : {
287 : Relation pubschsrel;
288 : ScanKeyData scankey;
289 : SysScanDesc scan;
290 : HeapTuple tup;
291 215 : bool result = false;
292 :
293 215 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
294 215 : ScanKeyInit(&scankey,
295 : Anum_pg_publication_namespace_pnpubid,
296 : BTEqualStrategyNumber, F_OIDEQ,
297 : ObjectIdGetDatum(pubid));
298 :
299 215 : scan = systable_beginscan(pubschsrel,
300 : PublicationNamespacePnnspidPnpubidIndexId,
301 : true, NULL, 1, &scankey);
302 215 : tup = systable_getnext(scan);
303 215 : result = HeapTupleIsValid(tup);
304 :
305 215 : systable_endscan(scan);
306 215 : table_close(pubschsrel, AccessShareLock);
307 :
308 215 : return result;
309 : }
310 :
311 : /*
312 : * Returns true if the publication has explicitly included relation (i.e.,
313 : * not marked as EXCEPT).
314 : */
315 : bool
316 49 : is_table_publication(Oid pubid)
317 : {
318 : Relation pubrelsrel;
319 : ScanKeyData scankey;
320 : SysScanDesc scan;
321 : HeapTuple tup;
322 49 : bool result = false;
323 :
324 49 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
325 49 : ScanKeyInit(&scankey,
326 : Anum_pg_publication_rel_prpubid,
327 : BTEqualStrategyNumber, F_OIDEQ,
328 : ObjectIdGetDatum(pubid));
329 :
330 49 : scan = systable_beginscan(pubrelsrel,
331 : PublicationRelPrpubidIndexId,
332 : true, NULL, 1, &scankey);
333 49 : tup = systable_getnext(scan);
334 49 : if (HeapTupleIsValid(tup))
335 : {
336 : Form_pg_publication_rel pubrel;
337 :
338 25 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
339 :
340 : /*
341 : * For any publication, pg_publication_rel contains either only EXCEPT
342 : * entries or only explicitly included tables. Therefore, examining
343 : * the first tuple is sufficient to determine table inclusion.
344 : */
345 25 : result = !pubrel->prexcept;
346 : }
347 :
348 49 : systable_endscan(scan);
349 49 : table_close(pubrelsrel, AccessShareLock);
350 :
351 49 : return result;
352 : }
353 :
354 : /*
355 : * Returns true if the relation has column list associated with the
356 : * publication, false otherwise.
357 : *
358 : * If a column list is found, the corresponding bitmap is returned through the
359 : * cols parameter, if provided. The bitmap is constructed within the given
360 : * memory context (mcxt).
361 : */
362 : bool
363 924 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
364 : Bitmapset **cols)
365 : {
366 : HeapTuple cftuple;
367 924 : bool found = false;
368 :
369 924 : if (pub->alltables)
370 221 : return false;
371 :
372 703 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
373 : ObjectIdGetDatum(relid),
374 : ObjectIdGetDatum(pub->oid));
375 703 : if (HeapTupleIsValid(cftuple))
376 : {
377 : Datum cfdatum;
378 : bool isnull;
379 :
380 : /* Lookup the column list attribute. */
381 645 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
382 : Anum_pg_publication_rel_prattrs, &isnull);
383 :
384 : /* Was a column list found? */
385 645 : if (!isnull)
386 : {
387 : /* Build the column list bitmap in the given memory context. */
388 193 : if (cols)
389 190 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
390 :
391 193 : found = true;
392 : }
393 :
394 645 : ReleaseSysCache(cftuple);
395 : }
396 :
397 703 : return found;
398 : }
399 :
400 : /*
401 : * Gets the relations based on the publication partition option for a specified
402 : * relation.
403 : */
404 : List *
405 2498 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
406 : Oid relid)
407 : {
408 2498 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
409 : pub_partopt != PUBLICATION_PART_ROOT)
410 701 : {
411 701 : List *all_parts = find_all_inheritors(relid, NoLock,
412 : NULL);
413 :
414 701 : if (pub_partopt == PUBLICATION_PART_ALL)
415 682 : result = list_concat(result, all_parts);
416 19 : else if (pub_partopt == PUBLICATION_PART_LEAF)
417 : {
418 : ListCell *lc;
419 :
420 69 : foreach(lc, all_parts)
421 : {
422 50 : Oid partOid = lfirst_oid(lc);
423 :
424 50 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
425 30 : result = lappend_oid(result, partOid);
426 : }
427 : }
428 : else
429 : Assert(false);
430 : }
431 : else
432 1797 : result = lappend_oid(result, relid);
433 :
434 2498 : return result;
435 : }
436 :
437 : /*
438 : * Returns the relid of the topmost ancestor that is published via this
439 : * publication if any and set its ancestor level to ancestor_level,
440 : * otherwise returns InvalidOid.
441 : *
442 : * The ancestor_level value allows us to compare the results for multiple
443 : * publications, and decide which value is higher up.
444 : *
445 : * Note that the list of ancestors should be ordered such that the topmost
446 : * ancestor is at the end of the list.
447 : */
448 : Oid
449 426 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
450 : {
451 : ListCell *lc;
452 426 : Oid topmost_relid = InvalidOid;
453 426 : int level = 0;
454 :
455 : /*
456 : * Find the "topmost" ancestor that is in this publication.
457 : */
458 862 : foreach(lc, ancestors)
459 : {
460 436 : Oid ancestor = lfirst_oid(lc);
461 436 : List *apubids = GetRelationIncludedPublications(ancestor);
462 436 : List *aschemaPubids = NIL;
463 :
464 436 : level++;
465 :
466 436 : if (list_member_oid(apubids, puboid))
467 : {
468 223 : topmost_relid = ancestor;
469 :
470 223 : if (ancestor_level)
471 43 : *ancestor_level = level;
472 : }
473 : else
474 : {
475 213 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
476 213 : if (list_member_oid(aschemaPubids, puboid))
477 : {
478 13 : topmost_relid = ancestor;
479 :
480 13 : if (ancestor_level)
481 5 : *ancestor_level = level;
482 : }
483 : }
484 :
485 436 : list_free(apubids);
486 436 : list_free(aschemaPubids);
487 : }
488 :
489 426 : return topmost_relid;
490 : }
491 :
492 : /*
493 : * attnumstoint2vector
494 : * Convert a Bitmapset of AttrNumbers into an int2vector.
495 : *
496 : * AttrNumber numbers are 0-based, i.e., not offset by
497 : * FirstLowInvalidHeapAttributeNumber.
498 : */
499 : static int2vector *
500 212 : attnumstoint2vector(Bitmapset *attrs)
501 : {
502 : int2vector *result;
503 212 : int n = bms_num_members(attrs);
504 212 : int i = -1;
505 212 : int j = 0;
506 :
507 212 : result = buildint2vector(NULL, n);
508 :
509 577 : while ((i = bms_next_member(attrs, i)) >= 0)
510 : {
511 : Assert(i <= PG_INT16_MAX);
512 :
513 365 : result->values[j++] = (int16) i;
514 : }
515 :
516 212 : return result;
517 : }
518 :
519 : /*
520 : * Insert new publication / relation mapping.
521 : */
522 : ObjectAddress
523 830 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
524 : bool if_not_exists, AlterPublicationStmt *alter_stmt)
525 : {
526 : Relation rel;
527 : HeapTuple tup;
528 : Datum values[Natts_pg_publication_rel];
529 : bool nulls[Natts_pg_publication_rel];
530 830 : Relation targetrel = pri->relation;
531 830 : Oid relid = RelationGetRelid(targetrel);
532 : Oid pubreloid;
533 : Bitmapset *attnums;
534 830 : Publication *pub = GetPublication(pubid);
535 : ObjectAddress myself,
536 : referenced;
537 830 : List *relids = NIL;
538 : int i;
539 : bool inval_except_table;
540 :
541 830 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
542 :
543 : /*
544 : * Check for duplicates. Note that this does not really prevent
545 : * duplicates, it's here just to provide nicer error message in common
546 : * case. The real protection is the unique key on the catalog.
547 : */
548 830 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
549 : ObjectIdGetDatum(pubid)))
550 : {
551 22 : table_close(rel, RowExclusiveLock);
552 :
553 22 : if (if_not_exists)
554 18 : return InvalidObjectAddress;
555 :
556 4 : ereport(ERROR,
557 : (errcode(ERRCODE_DUPLICATE_OBJECT),
558 : errmsg("relation \"%s\" is already member of publication \"%s\"",
559 : RelationGetRelationName(targetrel), pub->name)));
560 : }
561 :
562 808 : check_publication_add_relation(pri);
563 :
564 : /* Validate and translate column names into a Bitmapset of attnums. */
565 783 : attnums = pub_collist_validate(pri->relation, pri->columns);
566 :
567 : /* Form a tuple. */
568 767 : memset(values, 0, sizeof(values));
569 767 : memset(nulls, false, sizeof(nulls));
570 :
571 767 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
572 : Anum_pg_publication_rel_oid);
573 767 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
574 767 : values[Anum_pg_publication_rel_prpubid - 1] =
575 767 : ObjectIdGetDatum(pubid);
576 767 : values[Anum_pg_publication_rel_prrelid - 1] =
577 767 : ObjectIdGetDatum(relid);
578 767 : values[Anum_pg_publication_rel_prexcept - 1] =
579 767 : BoolGetDatum(pri->except);
580 :
581 : /* Add qualifications, if available */
582 767 : if (pri->whereClause != NULL)
583 219 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
584 : else
585 548 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
586 :
587 : /* Add column list, if available */
588 767 : if (pri->columns)
589 212 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
590 : else
591 555 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
592 :
593 767 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
594 :
595 : /* Insert tuple into catalog. */
596 767 : CatalogTupleInsert(rel, tup);
597 767 : heap_freetuple(tup);
598 :
599 : /* Register dependencies as needed */
600 767 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
601 :
602 : /* Add dependency on the publication */
603 767 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
604 767 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
605 :
606 : /* Add dependency on the relation */
607 767 : ObjectAddressSet(referenced, RelationRelationId, relid);
608 767 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
609 :
610 : /* Add dependency on the objects mentioned in the qualifications */
611 767 : if (pri->whereClause)
612 219 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
613 : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
614 : false);
615 :
616 : /* Add dependency on the columns, if any are listed */
617 767 : i = -1;
618 1132 : while ((i = bms_next_member(attnums, i)) >= 0)
619 : {
620 365 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
621 365 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
622 : }
623 :
624 : /* Close the table. */
625 767 : table_close(rel, RowExclusiveLock);
626 :
627 : /*
628 : * Determine whether EXCEPT tables require explicit relcache invalidation.
629 : *
630 : * For CREATE PUBLICATION with EXCEPT tables, invalidation is skipped
631 : * here, as CreatePublication() function invalidates all relations as part
632 : * of defining a FOR ALL TABLES publication.
633 : *
634 : * For ALTER PUBLICATION, invalidation is needed only when adding an
635 : * EXCEPT table to a publication already marked as ALL TABLES. For
636 : * publications that were originally empty or defined as ALL SEQUENCES and
637 : * are being converted to ALL TABLES, invalidation is skipped here, as
638 : * AlterPublicationAllFlags() function invalidates all relations while
639 : * marking the publication as ALL TABLES publication.
640 : */
641 776 : inval_except_table = (alter_stmt != NULL) && pub->alltables &&
642 9 : (alter_stmt->for_all_tables && pri->except);
643 :
644 767 : if (!pri->except || inval_except_table)
645 : {
646 : /*
647 : * Invalidate relcache so that publication info is rebuilt.
648 : *
649 : * For the partitioned tables, we must invalidate all partitions
650 : * contained in the respective partition hierarchies, not just the one
651 : * explicitly mentioned in the publication. This is required because
652 : * we implicitly publish the child tables when the parent table is
653 : * published.
654 : */
655 699 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
656 : relid);
657 :
658 699 : InvalidatePublicationRels(relids);
659 : }
660 :
661 767 : return myself;
662 : }
663 :
664 : /*
665 : * pub_collist_validate
666 : * Process and validate the 'columns' list and ensure the columns are all
667 : * valid to use for a publication. Checks for and raises an ERROR for
668 : * any unknown columns, system columns, duplicate columns, or virtual
669 : * generated columns.
670 : *
671 : * Looks up each column's attnum and returns a 0-based Bitmapset of the
672 : * corresponding attnums.
673 : */
674 : Bitmapset *
675 1073 : pub_collist_validate(Relation targetrel, List *columns)
676 : {
677 1073 : Bitmapset *set = NULL;
678 : ListCell *lc;
679 1073 : TupleDesc tupdesc = RelationGetDescr(targetrel);
680 :
681 1618 : foreach(lc, columns)
682 : {
683 569 : char *colname = strVal(lfirst(lc));
684 569 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
685 :
686 569 : if (attnum == InvalidAttrNumber)
687 4 : ereport(ERROR,
688 : errcode(ERRCODE_UNDEFINED_COLUMN),
689 : errmsg("column \"%s\" of relation \"%s\" does not exist",
690 : colname, RelationGetRelationName(targetrel)));
691 :
692 565 : if (!AttrNumberIsForUserDefinedAttr(attnum))
693 8 : ereport(ERROR,
694 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
695 : errmsg("cannot use system column \"%s\" in publication column list",
696 : colname));
697 :
698 557 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
699 4 : ereport(ERROR,
700 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
701 : errmsg("cannot use virtual generated column \"%s\" in publication column list",
702 : colname));
703 :
704 553 : if (bms_is_member(attnum, set))
705 8 : ereport(ERROR,
706 : errcode(ERRCODE_DUPLICATE_OBJECT),
707 : errmsg("duplicate column \"%s\" in publication column list",
708 : colname));
709 :
710 545 : set = bms_add_member(set, attnum);
711 : }
712 :
713 1049 : return set;
714 : }
715 :
716 : /*
717 : * Transform a column list (represented by an array Datum) to a bitmapset.
718 : *
719 : * If columns isn't NULL, add the column numbers to that set.
720 : *
721 : * If mcxt isn't NULL, build the bitmapset in that context.
722 : */
723 : Bitmapset *
724 280 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
725 : {
726 280 : Bitmapset *result = columns;
727 : ArrayType *arr;
728 : int nelems;
729 : int16 *elems;
730 280 : MemoryContext oldcxt = NULL;
731 :
732 280 : arr = DatumGetArrayTypeP(pubcols);
733 280 : nelems = ARR_DIMS(arr)[0];
734 280 : elems = (int16 *) ARR_DATA_PTR(arr);
735 :
736 : /* If a memory context was specified, switch to it. */
737 280 : if (mcxt)
738 41 : oldcxt = MemoryContextSwitchTo(mcxt);
739 :
740 770 : for (int i = 0; i < nelems; i++)
741 490 : result = bms_add_member(result, elems[i]);
742 :
743 280 : if (mcxt)
744 41 : MemoryContextSwitchTo(oldcxt);
745 :
746 280 : return result;
747 : }
748 :
749 : /*
750 : * Returns a bitmap representing the columns of the specified table.
751 : *
752 : * Generated columns are included if include_gencols_type is
753 : * PUBLISH_GENCOLS_STORED.
754 : */
755 : Bitmapset *
756 9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
757 : {
758 9 : Bitmapset *result = NULL;
759 9 : TupleDesc desc = RelationGetDescr(relation);
760 :
761 30 : for (int i = 0; i < desc->natts; i++)
762 : {
763 21 : Form_pg_attribute att = TupleDescAttr(desc, i);
764 :
765 21 : if (att->attisdropped)
766 1 : continue;
767 :
768 20 : if (att->attgenerated)
769 : {
770 : /* We only support replication of STORED generated cols. */
771 2 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
772 1 : continue;
773 :
774 : /* User hasn't requested to replicate STORED generated cols. */
775 1 : if (include_gencols_type != PUBLISH_GENCOLS_STORED)
776 1 : continue;
777 : }
778 :
779 18 : result = bms_add_member(result, att->attnum);
780 : }
781 :
782 9 : return result;
783 : }
784 :
785 : /*
786 : * Insert new publication / schema mapping.
787 : */
788 : ObjectAddress
789 175 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
790 : {
791 : Relation rel;
792 : HeapTuple tup;
793 : Datum values[Natts_pg_publication_namespace];
794 : bool nulls[Natts_pg_publication_namespace];
795 : Oid psschid;
796 175 : Publication *pub = GetPublication(pubid);
797 175 : List *schemaRels = NIL;
798 : ObjectAddress myself,
799 : referenced;
800 :
801 175 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
802 :
803 : /*
804 : * Check for duplicates. Note that this does not really prevent
805 : * duplicates, it's here just to provide nicer error message in common
806 : * case. The real protection is the unique key on the catalog.
807 : */
808 175 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
809 : ObjectIdGetDatum(schemaid),
810 : ObjectIdGetDatum(pubid)))
811 : {
812 12 : table_close(rel, RowExclusiveLock);
813 :
814 12 : if (if_not_exists)
815 8 : return InvalidObjectAddress;
816 :
817 4 : ereport(ERROR,
818 : (errcode(ERRCODE_DUPLICATE_OBJECT),
819 : errmsg("schema \"%s\" is already member of publication \"%s\"",
820 : get_namespace_name(schemaid), pub->name)));
821 : }
822 :
823 163 : check_publication_add_schema(schemaid);
824 :
825 : /* Form a tuple */
826 159 : memset(values, 0, sizeof(values));
827 159 : memset(nulls, false, sizeof(nulls));
828 :
829 159 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
830 : Anum_pg_publication_namespace_oid);
831 159 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
832 159 : values[Anum_pg_publication_namespace_pnpubid - 1] =
833 159 : ObjectIdGetDatum(pubid);
834 159 : values[Anum_pg_publication_namespace_pnnspid - 1] =
835 159 : ObjectIdGetDatum(schemaid);
836 :
837 159 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
838 :
839 : /* Insert tuple into catalog */
840 159 : CatalogTupleInsert(rel, tup);
841 159 : heap_freetuple(tup);
842 :
843 159 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
844 :
845 : /* Add dependency on the publication */
846 159 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
847 159 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
848 :
849 : /* Add dependency on the schema */
850 159 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
851 159 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
852 :
853 : /* Close the table */
854 159 : table_close(rel, RowExclusiveLock);
855 :
856 : /*
857 : * Invalidate relcache so that publication info is rebuilt. See
858 : * publication_add_relation for why we need to consider all the
859 : * partitions.
860 : */
861 159 : schemaRels = GetSchemaPublicationRelations(schemaid,
862 : PUBLICATION_PART_ALL);
863 159 : InvalidatePublicationRels(schemaRels);
864 :
865 159 : return myself;
866 : }
867 :
868 : /*
869 : * Internal function to get the list of publication oids for a relation.
870 : *
871 : * If except_flag is true, returns the list of publication that specified the
872 : * relation in the EXCEPT clause; otherwise, returns the list of publications
873 : * in which relation is included.
874 : */
875 : static List *
876 16710 : get_relation_publications(Oid relid, bool except_flag)
877 : {
878 16710 : List *result = NIL;
879 : CatCList *pubrellist;
880 :
881 : /* Find all publications associated with the relation. */
882 16710 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
883 : ObjectIdGetDatum(relid));
884 18350 : for (int i = 0; i < pubrellist->n_members; i++)
885 : {
886 1640 : HeapTuple tup = &pubrellist->members[i]->tuple;
887 1640 : Form_pg_publication_rel pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
888 1640 : Oid pubid = pubrel->prpubid;
889 :
890 1640 : if (pubrel->prexcept == except_flag)
891 1169 : result = lappend_oid(result, pubid);
892 : }
893 :
894 16710 : ReleaseSysCacheList(pubrellist);
895 :
896 16710 : return result;
897 : }
898 :
899 : /*
900 : * Gets list of publication oids for a relation.
901 : */
902 : List *
903 9004 : GetRelationIncludedPublications(Oid relid)
904 : {
905 9004 : return get_relation_publications(relid, false);
906 : }
907 :
908 : /*
909 : * Gets list of publication oids which has relation in the EXCEPT clause.
910 : */
911 : List *
912 7706 : GetRelationExcludedPublications(Oid relid)
913 : {
914 7706 : return get_relation_publications(relid, true);
915 : }
916 :
917 : /*
918 : * Internal function to get the list of relation oids for a publication.
919 : *
920 : * If except_flag is true, returns the list of relations specified in the
921 : * EXCEPT clause of the publication; otherwise, returns the list of relations
922 : * included in the publication.
923 : */
924 : static List *
925 680 : get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt,
926 : bool except_flag)
927 : {
928 : List *result;
929 : Relation pubrelsrel;
930 : ScanKeyData scankey;
931 : SysScanDesc scan;
932 : HeapTuple tup;
933 :
934 : /* Find all relations associated with the publication. */
935 680 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
936 :
937 680 : ScanKeyInit(&scankey,
938 : Anum_pg_publication_rel_prpubid,
939 : BTEqualStrategyNumber, F_OIDEQ,
940 : ObjectIdGetDatum(pubid));
941 :
942 680 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
943 : true, NULL, 1, &scankey);
944 :
945 680 : result = NIL;
946 2000 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
947 : {
948 : Form_pg_publication_rel pubrel;
949 :
950 640 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
951 :
952 640 : if (except_flag == pubrel->prexcept)
953 640 : result = GetPubPartitionOptionRelations(result, pub_partopt,
954 : pubrel->prrelid);
955 : }
956 :
957 680 : systable_endscan(scan);
958 680 : table_close(pubrelsrel, AccessShareLock);
959 :
960 : /* Now sort and de-duplicate the result list */
961 680 : list_sort(result, list_oid_cmp);
962 680 : list_deduplicate_oid(result);
963 :
964 680 : return result;
965 : }
966 :
967 : /*
968 : * Gets list of relation oids that are associated with a publication.
969 : *
970 : * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
971 : * should use GetAllPublicationRelations().
972 : */
973 : List *
974 610 : GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
975 : {
976 : Assert(!GetPublication(pubid)->alltables);
977 :
978 610 : return get_publication_relations(pubid, pub_partopt, false);
979 : }
980 :
981 : /*
982 : * Gets list of table oids that were specified in the EXCEPT clause for a
983 : * publication.
984 : *
985 : * This should only be used FOR ALL TABLES publications.
986 : */
987 : List *
988 70 : GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
989 : {
990 : Assert(GetPublication(pubid)->alltables);
991 :
992 70 : return get_publication_relations(pubid, pub_partopt, true);
993 : }
994 :
995 : /*
996 : * Gets list of publication oids for publications marked as FOR ALL TABLES.
997 : */
998 : List *
999 6008 : GetAllTablesPublications(void)
1000 : {
1001 : List *result;
1002 : Relation rel;
1003 : ScanKeyData scankey;
1004 : SysScanDesc scan;
1005 : HeapTuple tup;
1006 :
1007 : /* Find all publications that are marked as for all tables. */
1008 6008 : rel = table_open(PublicationRelationId, AccessShareLock);
1009 :
1010 6008 : ScanKeyInit(&scankey,
1011 : Anum_pg_publication_puballtables,
1012 : BTEqualStrategyNumber, F_BOOLEQ,
1013 : BoolGetDatum(true));
1014 :
1015 6008 : scan = systable_beginscan(rel, InvalidOid, false,
1016 : NULL, 1, &scankey);
1017 :
1018 6008 : result = NIL;
1019 6138 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
1020 : {
1021 130 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
1022 :
1023 130 : result = lappend_oid(result, oid);
1024 : }
1025 :
1026 6008 : systable_endscan(scan);
1027 6008 : table_close(rel, AccessShareLock);
1028 :
1029 6008 : return result;
1030 : }
1031 :
1032 : /*
1033 : * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
1034 : * publication.
1035 : *
1036 : * If the publication publishes partition changes via their respective root
1037 : * partitioned tables, we must exclude partitions in favor of including the
1038 : * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
1039 : * publication.
1040 : *
1041 : * For a FOR ALL TABLES publication, the returned list excludes tables mentioned
1042 : * in the EXCEPT clause.
1043 : */
1044 : List *
1045 55 : GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
1046 : {
1047 : Relation classRel;
1048 : ScanKeyData key[1];
1049 : TableScanDesc scan;
1050 : HeapTuple tuple;
1051 55 : List *result = NIL;
1052 55 : List *exceptlist = NIL;
1053 :
1054 : Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
1055 :
1056 : /* EXCEPT filtering applies only to relations, not sequences */
1057 55 : if (relkind == RELKIND_RELATION)
1058 49 : exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ?
1059 49 : PUBLICATION_PART_ROOT :
1060 : PUBLICATION_PART_LEAF);
1061 :
1062 55 : classRel = table_open(RelationRelationId, AccessShareLock);
1063 :
1064 55 : ScanKeyInit(&key[0],
1065 : Anum_pg_class_relkind,
1066 : BTEqualStrategyNumber, F_CHAREQ,
1067 : CharGetDatum(relkind));
1068 :
1069 55 : scan = table_beginscan_catalog(classRel, 1, key);
1070 :
1071 3769 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1072 : {
1073 3714 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1074 3714 : Oid relid = relForm->oid;
1075 :
1076 3714 : if (is_publishable_class(relid, relForm) &&
1077 135 : !(relForm->relispartition && pubviaroot) &&
1078 112 : !list_member_oid(exceptlist, relid))
1079 104 : result = lappend_oid(result, relid);
1080 : }
1081 :
1082 55 : table_endscan(scan);
1083 :
1084 55 : if (pubviaroot)
1085 : {
1086 4 : ScanKeyInit(&key[0],
1087 : Anum_pg_class_relkind,
1088 : BTEqualStrategyNumber, F_CHAREQ,
1089 : CharGetDatum(RELKIND_PARTITIONED_TABLE));
1090 :
1091 4 : scan = table_beginscan_catalog(classRel, 1, key);
1092 :
1093 21 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1094 : {
1095 17 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1096 17 : Oid relid = relForm->oid;
1097 :
1098 17 : if (is_publishable_class(relid, relForm) &&
1099 17 : !relForm->relispartition &&
1100 13 : !list_member_oid(exceptlist, relid))
1101 12 : result = lappend_oid(result, relid);
1102 : }
1103 :
1104 4 : table_endscan(scan);
1105 : }
1106 :
1107 55 : table_close(classRel, AccessShareLock);
1108 55 : return result;
1109 : }
1110 :
1111 : /*
1112 : * Gets the list of schema oids for a publication.
1113 : *
1114 : * This should only be used FOR TABLES IN SCHEMA publications.
1115 : */
1116 : List *
1117 576 : GetPublicationSchemas(Oid pubid)
1118 : {
1119 576 : List *result = NIL;
1120 : Relation pubschsrel;
1121 : ScanKeyData scankey;
1122 : SysScanDesc scan;
1123 : HeapTuple tup;
1124 :
1125 : /* Find all schemas associated with the publication */
1126 576 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
1127 :
1128 576 : ScanKeyInit(&scankey,
1129 : Anum_pg_publication_namespace_pnpubid,
1130 : BTEqualStrategyNumber, F_OIDEQ,
1131 : ObjectIdGetDatum(pubid));
1132 :
1133 576 : scan = systable_beginscan(pubschsrel,
1134 : PublicationNamespacePnnspidPnpubidIndexId,
1135 : true, NULL, 1, &scankey);
1136 616 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
1137 : {
1138 : Form_pg_publication_namespace pubsch;
1139 :
1140 40 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1141 :
1142 40 : result = lappend_oid(result, pubsch->pnnspid);
1143 : }
1144 :
1145 576 : systable_endscan(scan);
1146 576 : table_close(pubschsrel, AccessShareLock);
1147 :
1148 576 : return result;
1149 : }
1150 :
1151 : /*
1152 : * Gets the list of publication oids associated with a specified schema.
1153 : */
1154 : List *
1155 8672 : GetSchemaPublications(Oid schemaid)
1156 : {
1157 8672 : List *result = NIL;
1158 : CatCList *pubschlist;
1159 : int i;
1160 :
1161 : /* Find all publications associated with the schema */
1162 8672 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
1163 : ObjectIdGetDatum(schemaid));
1164 8752 : for (i = 0; i < pubschlist->n_members; i++)
1165 : {
1166 80 : HeapTuple tup = &pubschlist->members[i]->tuple;
1167 80 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
1168 :
1169 80 : result = lappend_oid(result, pubid);
1170 : }
1171 :
1172 8672 : ReleaseSysCacheList(pubschlist);
1173 :
1174 8672 : return result;
1175 : }
1176 :
1177 : /*
1178 : * Get the list of publishable relation oids for a specified schema.
1179 : */
1180 : List *
1181 310 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
1182 : {
1183 : Relation classRel;
1184 : ScanKeyData key[1];
1185 : TableScanDesc scan;
1186 : HeapTuple tuple;
1187 310 : List *result = NIL;
1188 :
1189 : Assert(OidIsValid(schemaid));
1190 :
1191 310 : classRel = table_open(RelationRelationId, AccessShareLock);
1192 :
1193 310 : ScanKeyInit(&key[0],
1194 : Anum_pg_class_relnamespace,
1195 : BTEqualStrategyNumber, F_OIDEQ,
1196 : ObjectIdGetDatum(schemaid));
1197 :
1198 : /* get all the relations present in the specified schema */
1199 310 : scan = table_beginscan_catalog(classRel, 1, key);
1200 17256 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1201 : {
1202 16946 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1203 16946 : Oid relid = relForm->oid;
1204 : char relkind;
1205 :
1206 16946 : if (!is_publishable_class(relid, relForm))
1207 5839 : continue;
1208 :
1209 11107 : relkind = get_rel_relkind(relid);
1210 11107 : if (relkind == RELKIND_RELATION)
1211 9554 : result = lappend_oid(result, relid);
1212 1553 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1213 : {
1214 495 : List *partitionrels = NIL;
1215 :
1216 : /*
1217 : * It is quite possible that some of the partitions are in a
1218 : * different schema than the parent table, so we need to get such
1219 : * partitions separately.
1220 : */
1221 495 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1222 : pub_partopt,
1223 : relForm->oid);
1224 495 : result = list_concat_unique_oid(result, partitionrels);
1225 : }
1226 : }
1227 :
1228 310 : table_endscan(scan);
1229 310 : table_close(classRel, AccessShareLock);
1230 310 : return result;
1231 : }
1232 :
1233 : /*
1234 : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1235 : * publication.
1236 : */
1237 : List *
1238 283 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1239 : {
1240 283 : List *result = NIL;
1241 283 : List *pubschemalist = GetPublicationSchemas(pubid);
1242 : ListCell *cell;
1243 :
1244 303 : foreach(cell, pubschemalist)
1245 : {
1246 20 : Oid schemaid = lfirst_oid(cell);
1247 20 : List *schemaRels = NIL;
1248 :
1249 20 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1250 20 : result = list_concat(result, schemaRels);
1251 : }
1252 :
1253 283 : return result;
1254 : }
1255 :
1256 : /*
1257 : * Get publication using oid
1258 : *
1259 : * The Publication struct and its data are palloc'ed here.
1260 : */
1261 : Publication *
1262 4199 : GetPublication(Oid pubid)
1263 : {
1264 : HeapTuple tup;
1265 : Publication *pub;
1266 : Form_pg_publication pubform;
1267 :
1268 4199 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1269 4199 : if (!HeapTupleIsValid(tup))
1270 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1271 :
1272 4199 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1273 :
1274 4199 : pub = palloc_object(Publication);
1275 4199 : pub->oid = pubid;
1276 4199 : pub->name = pstrdup(NameStr(pubform->pubname));
1277 4199 : pub->alltables = pubform->puballtables;
1278 4199 : pub->allsequences = pubform->puballsequences;
1279 4199 : pub->pubactions.pubinsert = pubform->pubinsert;
1280 4199 : pub->pubactions.pubupdate = pubform->pubupdate;
1281 4199 : pub->pubactions.pubdelete = pubform->pubdelete;
1282 4199 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1283 4199 : pub->pubviaroot = pubform->pubviaroot;
1284 4199 : pub->pubgencols_type = pubform->pubgencols;
1285 :
1286 4199 : ReleaseSysCache(tup);
1287 :
1288 4199 : return pub;
1289 : }
1290 :
1291 : /*
1292 : * Get Publication using name.
1293 : */
1294 : Publication *
1295 1789 : GetPublicationByName(const char *pubname, bool missing_ok)
1296 : {
1297 : Oid oid;
1298 :
1299 1789 : oid = get_publication_oid(pubname, missing_ok);
1300 :
1301 1789 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1302 : }
1303 :
1304 : /*
1305 : * A helper function for pg_get_publication_tables() to check whether the
1306 : * table with the given relid is published in the specified publication.
1307 : *
1308 : * This function evaluates the effective published OID based on the
1309 : * publish_via_partition_root setting, rather than just checking catalog entries
1310 : * (e.g., pg_publication_rel). For instance, when publish_via_partition_root is
1311 : * false, it returns false for a parent partitioned table and returns true
1312 : * for its leaf partitions, even if the parent is the one explicitly added
1313 : * to the publication.
1314 : *
1315 : * For performance reasons, this function avoids the overhead of constructing
1316 : * the complete list of published tables during the evaluation. It can execute
1317 : * quickly even when the publication contains a large number of relations.
1318 : *
1319 : * Note: this leaks memory for the ancestors list into the current memory
1320 : * context.
1321 : */
1322 : static bool
1323 990 : is_table_publishable_in_publication(Oid relid, Publication *pub)
1324 : {
1325 : bool relispartition;
1326 990 : List *ancestors = NIL;
1327 :
1328 : /*
1329 : * For non-pubviaroot publications, a partitioned table is never the
1330 : * effective published OID; only its leaf partitions can be.
1331 : */
1332 990 : if (!pub->pubviaroot && get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE)
1333 86 : return false;
1334 :
1335 904 : relispartition = get_rel_relispartition(relid);
1336 :
1337 904 : if (relispartition)
1338 170 : ancestors = get_partition_ancestors(relid);
1339 :
1340 904 : if (pub->alltables)
1341 : {
1342 : /*
1343 : * ALL TABLES with pubviaroot includes only regular tables or top-most
1344 : * partitioned tables -- never child partitions.
1345 : */
1346 204 : if (pub->pubviaroot && relispartition)
1347 12 : return false;
1348 :
1349 : /*
1350 : * For ALL TABLES publications, the table is published unless it
1351 : * appears in the EXCEPT clause. Only the top-most can appear in the
1352 : * EXCEPT clause, so exclusion must be evaluated at the top-most
1353 : * ancestor if it has. These publications store only EXCEPT'ed tables
1354 : * in pg_publication_rel, so checking existence is sufficient.
1355 : *
1356 : * Note that this existence check below would incorrectly return true
1357 : * (published) for partitions when pubviaroot is enabled; however,
1358 : * that case is already caught and returned false by the above check.
1359 : */
1360 192 : return !SearchSysCacheExists2(PUBLICATIONRELMAP,
1361 : ObjectIdGetDatum(ancestors
1362 : ? llast_oid(ancestors) : relid),
1363 : ObjectIdGetDatum(pub->oid));
1364 : }
1365 :
1366 : /*
1367 : * Non-ALL-TABLE publication cases.
1368 : *
1369 : * A table is published if it (or a containing schema) was explicitly
1370 : * added, or if it is a partition whose ancestor was added.
1371 : */
1372 :
1373 : /*
1374 : * If an ancestor is published, the partition's status depends on
1375 : * publish_via_partition_root value.
1376 : *
1377 : * If it's true, the ancestor's relation OID is the effective published
1378 : * OID, so the partition itself should be excluded (return false).
1379 : *
1380 : * If it's false, the partition is covered by its ancestor's presence in
1381 : * the publication, it should be included (return true).
1382 : */
1383 840 : if (relispartition &&
1384 140 : OidIsValid(GetTopMostAncestorInPublication(pub->oid, ancestors, NULL)))
1385 44 : return !pub->pubviaroot;
1386 :
1387 : /*
1388 : * Check whether the table is explicitly published via pg_publication_rel
1389 : * or pg_publication_namespace.
1390 : */
1391 656 : return (SearchSysCacheExists2(PUBLICATIONRELMAP,
1392 : ObjectIdGetDatum(relid),
1393 1026 : ObjectIdGetDatum(pub->oid)) ||
1394 370 : SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1395 : ObjectIdGetDatum(get_rel_namespace(relid)),
1396 : ObjectIdGetDatum(pub->oid)));
1397 : }
1398 :
1399 : /*
1400 : * Helper function to get information of the tables in the given
1401 : * publication(s).
1402 : *
1403 : * If filter_by_relid is true, only the row(s) for target_relid is returned;
1404 : * if target_relid does not exist or is not part of the publications, zero
1405 : * rows are returned. If filter_by_relid is false, rows for all tables
1406 : * within the specified publications are returned and target_relid is
1407 : * ignored.
1408 : *
1409 : * Returns pubid, relid, column list, and row filter for each table.
1410 : */
1411 : static Datum
1412 1608 : pg_get_publication_tables(FunctionCallInfo fcinfo, ArrayType *pubnames,
1413 : Oid target_relid, bool filter_by_relid,
1414 : bool pub_missing_ok)
1415 : {
1416 : #define NUM_PUBLICATION_TABLES_ELEM 4
1417 : FuncCallContext *funcctx;
1418 1608 : List *table_infos = NIL;
1419 :
1420 : /* stuff done only on the first call of the function */
1421 1608 : if (SRF_IS_FIRSTCALL())
1422 : {
1423 : TupleDesc tupdesc;
1424 : MemoryContext oldcontext;
1425 : Datum *elems;
1426 : int nelems,
1427 : i;
1428 758 : bool viaroot = false;
1429 :
1430 : /* create a function context for cross-call persistence */
1431 758 : funcctx = SRF_FIRSTCALL_INIT();
1432 :
1433 : /*
1434 : * Preliminary check if the specified table can be published in the
1435 : * first place. If not, we can return early without checking the given
1436 : * publications and the table.
1437 : */
1438 758 : if (filter_by_relid && !is_publishable_table(target_relid))
1439 8 : SRF_RETURN_DONE(funcctx);
1440 :
1441 : /* switch to memory context appropriate for multiple function calls */
1442 750 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1443 :
1444 : /*
1445 : * Deconstruct the parameter into elements where each element is a
1446 : * publication name.
1447 : */
1448 750 : deconstruct_array_builtin(pubnames, TEXTOID, &elems, NULL, &nelems);
1449 :
1450 : /* Get Oids of tables from each publication. */
1451 1998 : for (i = 0; i < nelems; i++)
1452 : {
1453 : Publication *pub_elem;
1454 1248 : List *pub_elem_tables = NIL;
1455 : ListCell *lc;
1456 :
1457 1248 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]),
1458 : pub_missing_ok);
1459 :
1460 1248 : if (pub_elem == NULL)
1461 6 : continue;
1462 :
1463 1242 : if (filter_by_relid)
1464 : {
1465 : /* Check if the given table is published for the publication */
1466 990 : if (is_table_publishable_in_publication(target_relid, pub_elem))
1467 : {
1468 504 : pub_elem_tables = list_make1_oid(target_relid);
1469 : }
1470 : }
1471 : else
1472 : {
1473 : /*
1474 : * Publications support partitioned tables. If
1475 : * publish_via_partition_root is false, all changes are
1476 : * replicated using leaf partition identity and schema, so we
1477 : * only need those. Otherwise, get the partitioned table
1478 : * itself.
1479 : */
1480 252 : if (pub_elem->alltables)
1481 49 : pub_elem_tables = GetAllPublicationRelations(pub_elem->oid,
1482 : RELKIND_RELATION,
1483 49 : pub_elem->pubviaroot);
1484 : else
1485 : {
1486 : List *relids,
1487 : *schemarelids;
1488 :
1489 203 : relids = GetIncludedPublicationRelations(pub_elem->oid,
1490 203 : pub_elem->pubviaroot ?
1491 203 : PUBLICATION_PART_ROOT :
1492 : PUBLICATION_PART_LEAF);
1493 203 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1494 203 : pub_elem->pubviaroot ?
1495 203 : PUBLICATION_PART_ROOT :
1496 : PUBLICATION_PART_LEAF);
1497 203 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1498 : }
1499 : }
1500 :
1501 : /*
1502 : * Record the published table and the corresponding publication so
1503 : * that we can get row filters and column lists later.
1504 : *
1505 : * When a table is published by multiple publications, to obtain
1506 : * all row filters and column lists, the structure related to this
1507 : * table will be recorded multiple times.
1508 : */
1509 2122 : foreach(lc, pub_elem_tables)
1510 : {
1511 880 : published_rel *table_info = palloc_object(published_rel);
1512 :
1513 880 : table_info->relid = lfirst_oid(lc);
1514 880 : table_info->pubid = pub_elem->oid;
1515 880 : table_infos = lappend(table_infos, table_info);
1516 : }
1517 :
1518 : /* At least one publication is using publish_via_partition_root. */
1519 1242 : if (pub_elem->pubviaroot)
1520 249 : viaroot = true;
1521 : }
1522 :
1523 : /*
1524 : * If the publication publishes partition changes via their respective
1525 : * root partitioned tables, we must exclude partitions in favor of
1526 : * including the root partitioned tables. Otherwise, the function
1527 : * could return both the child and parent tables which could cause
1528 : * data of the child table to be double-published on the subscriber
1529 : * side.
1530 : */
1531 750 : if (viaroot)
1532 169 : filter_partitions(table_infos);
1533 :
1534 : /* Construct a tuple descriptor for the result rows. */
1535 750 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1536 750 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1537 : OIDOID, -1, 0);
1538 750 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1539 : OIDOID, -1, 0);
1540 750 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1541 : INT2VECTOROID, -1, 0);
1542 750 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1543 : PG_NODE_TREEOID, -1, 0);
1544 :
1545 750 : TupleDescFinalize(tupdesc);
1546 750 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1547 750 : funcctx->user_fctx = table_infos;
1548 :
1549 750 : MemoryContextSwitchTo(oldcontext);
1550 : }
1551 :
1552 : /* stuff done on every call of the function */
1553 1600 : funcctx = SRF_PERCALL_SETUP();
1554 1600 : table_infos = (List *) funcctx->user_fctx;
1555 :
1556 1600 : if (funcctx->call_cntr < list_length(table_infos))
1557 : {
1558 850 : HeapTuple pubtuple = NULL;
1559 : HeapTuple rettuple;
1560 : Publication *pub;
1561 850 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1562 850 : Oid relid = table_info->relid;
1563 850 : Oid schemaid = get_rel_namespace(relid);
1564 850 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1565 850 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1566 :
1567 : /*
1568 : * Form tuple with appropriate data.
1569 : */
1570 :
1571 850 : pub = GetPublication(table_info->pubid);
1572 :
1573 850 : values[0] = ObjectIdGetDatum(pub->oid);
1574 850 : values[1] = ObjectIdGetDatum(relid);
1575 :
1576 : /*
1577 : * We don't consider row filters or column lists for FOR ALL TABLES or
1578 : * FOR TABLES IN SCHEMA publications.
1579 : */
1580 850 : if (!pub->alltables &&
1581 577 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1582 : ObjectIdGetDatum(schemaid),
1583 : ObjectIdGetDatum(pub->oid)))
1584 541 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1585 : ObjectIdGetDatum(relid),
1586 : ObjectIdGetDatum(pub->oid));
1587 :
1588 850 : if (HeapTupleIsValid(pubtuple))
1589 : {
1590 : /* Lookup the column list attribute. */
1591 502 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1592 : Anum_pg_publication_rel_prattrs,
1593 : &(nulls[2]));
1594 :
1595 : /* Null indicates no filter. */
1596 502 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1597 : Anum_pg_publication_rel_prqual,
1598 : &(nulls[3]));
1599 : }
1600 : else
1601 : {
1602 348 : nulls[2] = true;
1603 348 : nulls[3] = true;
1604 : }
1605 :
1606 : /* Show all columns when the column list is not specified. */
1607 850 : if (nulls[2])
1608 : {
1609 754 : Relation rel = table_open(relid, AccessShareLock);
1610 754 : int nattnums = 0;
1611 : int16 *attnums;
1612 754 : TupleDesc desc = RelationGetDescr(rel);
1613 : int i;
1614 :
1615 754 : attnums = palloc_array(int16, desc->natts);
1616 :
1617 1990 : for (i = 0; i < desc->natts; i++)
1618 : {
1619 1236 : Form_pg_attribute att = TupleDescAttr(desc, i);
1620 :
1621 1236 : if (att->attisdropped)
1622 4 : continue;
1623 :
1624 1232 : if (att->attgenerated)
1625 : {
1626 : /* We only support replication of STORED generated cols. */
1627 22 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1628 10 : continue;
1629 :
1630 : /*
1631 : * User hasn't requested to replicate STORED generated
1632 : * cols.
1633 : */
1634 12 : if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
1635 9 : continue;
1636 : }
1637 :
1638 1213 : attnums[nattnums++] = att->attnum;
1639 : }
1640 :
1641 754 : if (nattnums > 0)
1642 : {
1643 750 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1644 750 : nulls[2] = false;
1645 : }
1646 :
1647 754 : table_close(rel, AccessShareLock);
1648 : }
1649 :
1650 850 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1651 :
1652 850 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1653 : }
1654 :
1655 750 : SRF_RETURN_DONE(funcctx);
1656 : }
1657 :
1658 : Datum
1659 550 : pg_get_publication_tables_a(PG_FUNCTION_ARGS)
1660 : {
1661 : /*
1662 : * Get information for all tables in the given publications.
1663 : * filter_by_relid is false so all tables are returned; pub_missing_ok is
1664 : * false for backward compatibility.
1665 : */
1666 550 : return pg_get_publication_tables(fcinfo, PG_GETARG_ARRAYTYPE_P(0),
1667 : InvalidOid, false, false);
1668 : }
1669 :
1670 : Datum
1671 1058 : pg_get_publication_tables_b(PG_FUNCTION_ARGS)
1672 : {
1673 : /*
1674 : * Get information for the specified table in the given publications. The
1675 : * SQL-level function is declared STRICT, so target_relid is guaranteed to
1676 : * be non-NULL here.
1677 : */
1678 1058 : return pg_get_publication_tables(fcinfo, PG_GETARG_ARRAYTYPE_P(0),
1679 : PG_GETARG_OID(1), true, true);
1680 : }
1681 :
1682 : /*
1683 : * Returns Oids of sequences in a publication.
1684 : */
1685 : Datum
1686 234 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
1687 : {
1688 : FuncCallContext *funcctx;
1689 234 : List *sequences = NIL;
1690 :
1691 : /* stuff done only on the first call of the function */
1692 234 : if (SRF_IS_FIRSTCALL())
1693 : {
1694 217 : char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1695 : Publication *publication;
1696 : MemoryContext oldcontext;
1697 :
1698 : /* create a function context for cross-call persistence */
1699 217 : funcctx = SRF_FIRSTCALL_INIT();
1700 :
1701 : /* switch to memory context appropriate for multiple function calls */
1702 217 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1703 :
1704 217 : publication = GetPublicationByName(pubname, false);
1705 :
1706 217 : if (publication->allsequences)
1707 6 : sequences = GetAllPublicationRelations(publication->oid,
1708 : RELKIND_SEQUENCE,
1709 : false);
1710 :
1711 217 : funcctx->user_fctx = sequences;
1712 :
1713 217 : MemoryContextSwitchTo(oldcontext);
1714 : }
1715 :
1716 : /* stuff done on every call of the function */
1717 234 : funcctx = SRF_PERCALL_SETUP();
1718 234 : sequences = (List *) funcctx->user_fctx;
1719 :
1720 234 : if (funcctx->call_cntr < list_length(sequences))
1721 : {
1722 17 : Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1723 :
1724 17 : SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
1725 : }
1726 :
1727 217 : SRF_RETURN_DONE(funcctx);
1728 : }
|