Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_publication.c
4 : * publication C API manipulation
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_publication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/catalog.h"
22 : #include "catalog/dependency.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/partition.h"
27 : #include "catalog/pg_inherits.h"
28 : #include "catalog/pg_namespace.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/publicationcmds.h"
34 : #include "funcapi.h"
35 : #include "utils/array.h"
36 : #include "utils/builtins.h"
37 : #include "utils/catcache.h"
38 : #include "utils/fmgroids.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/syscache.h"
42 :
43 : /* Records association between publication and published table */
44 : typedef struct
45 : {
46 : Oid relid; /* OID of published table */
47 : Oid pubid; /* OID of publication that publishes this
48 : * table. */
49 : } published_rel;
50 :
51 : /*
52 : * Check if relation can be in given publication and throws appropriate
53 : * error if not.
54 : */
55 : static void
56 754 : check_publication_add_relation(PublicationRelInfo *pri)
57 : {
58 754 : Relation targetrel = pri->relation;
59 : const char *errormsg;
60 :
61 754 : if (pri->except)
62 53 : errormsg = gettext_noop("cannot use publication EXCEPT clause for relation \"%s\"");
63 : else
64 701 : errormsg = gettext_noop("cannot add relation \"%s\" to publication");
65 :
66 : /* If in EXCEPT clause, must be root partitioned table */
67 754 : if (pri->except && targetrel->rd_rel->relispartition)
68 4 : ereport(ERROR,
69 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
70 : errmsg(errormsg, RelationGetRelationName(targetrel)),
71 : errdetail("This operation is not supported for individual partitions.")));
72 :
73 : /* Must be a regular or partitioned table */
74 750 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
75 95 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
76 9 : ereport(ERROR,
77 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
78 : errmsg(errormsg, RelationGetRelationName(targetrel)),
79 : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
80 :
81 : /* Can't be system table */
82 741 : if (IsCatalogRelation(targetrel))
83 4 : ereport(ERROR,
84 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : errmsg(errormsg, RelationGetRelationName(targetrel)),
86 : errdetail("This operation is not supported for system tables.")));
87 :
88 : /* UNLOGGED and TEMP relations cannot be part of publication. */
89 737 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
90 4 : ereport(ERROR,
91 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
92 : errmsg(errormsg, RelationGetRelationName(targetrel)),
93 : errdetail("This operation is not supported for temporary tables.")));
94 733 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
95 4 : ereport(ERROR,
96 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
97 : errmsg(errormsg, RelationGetRelationName(targetrel)),
98 : errdetail("This operation is not supported for unlogged tables.")));
99 729 : }
100 :
101 : /*
102 : * Check if schema can be in given publication and throw appropriate error if
103 : * not.
104 : */
105 : static void
106 159 : check_publication_add_schema(Oid schemaid)
107 : {
108 : /* Can't be system namespace */
109 159 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
110 4 : ereport(ERROR,
111 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
112 : errmsg("cannot add schema \"%s\" to publication",
113 : get_namespace_name(schemaid)),
114 : errdetail("This operation is not supported for system schemas.")));
115 :
116 : /* Can't be temporary namespace */
117 155 : if (isAnyTempNamespace(schemaid))
118 0 : ereport(ERROR,
119 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
120 : errmsg("cannot add schema \"%s\" to publication",
121 : get_namespace_name(schemaid)),
122 : errdetail("Temporary schemas cannot be replicated.")));
123 155 : }
124 :
125 : /*
126 : * Returns if relation represented by oid and Form_pg_class entry
127 : * is publishable.
128 : *
129 : * Does same checks as check_publication_add_relation() above except for
130 : * RELKIND_SEQUENCE, but does not need relation to be opened and also does
131 : * not throw errors. Here, the additional check is to support ALL SEQUENCES
132 : * publication.
133 : *
134 : * XXX This also excludes all tables with relid < FirstNormalObjectId,
135 : * ie all tables created during initdb. This mainly affects the preinstalled
136 : * information_schema. IsCatalogRelationOid() only excludes tables with
137 : * relid < FirstUnpinnedObjectId, making that test rather redundant,
138 : * but really we should get rid of the FirstNormalObjectId test not
139 : * IsCatalogRelationOid. We can't do so today because we don't want
140 : * information_schema tables to be considered publishable; but this test
141 : * is really inadequate for that, since the information_schema could be
142 : * dropped and reloaded and then it'll be considered publishable. The best
143 : * long-term solution may be to add a "relispublishable" bool to pg_class,
144 : * and depend on that instead of OID checks.
145 : */
146 : static bool
147 312857 : is_publishable_class(Oid relid, Form_pg_class reltuple)
148 : {
149 321079 : return (reltuple->relkind == RELKIND_RELATION ||
150 8222 : reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
151 7216 : reltuple->relkind == RELKIND_SEQUENCE) &&
152 307012 : !IsCatalogRelationOid(relid) &&
153 625714 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
154 : relid >= FirstNormalObjectId;
155 : }
156 :
157 : /*
158 : * Another variant of is_publishable_class(), taking a Relation.
159 : */
160 : bool
161 275886 : is_publishable_relation(Relation rel)
162 : {
163 275886 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
164 : }
165 :
166 : /*
167 : * SQL-callable variant of the above
168 : *
169 : * This returns null when the relation does not exist. This is intended to be
170 : * used for example in psql to avoid gratuitous errors when there are
171 : * concurrent catalog changes.
172 : */
173 : Datum
174 4272 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
175 : {
176 4272 : Oid relid = PG_GETARG_OID(0);
177 : HeapTuple tuple;
178 : bool result;
179 :
180 4272 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
181 4272 : if (!HeapTupleIsValid(tuple))
182 0 : PG_RETURN_NULL();
183 4272 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
184 4272 : ReleaseSysCache(tuple);
185 4272 : PG_RETURN_BOOL(result);
186 : }
187 :
188 : /*
189 : * Returns true if the ancestor is in the list of published relations.
190 : * Otherwise, returns false.
191 : */
192 : static bool
193 107 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
194 : {
195 : ListCell *lc;
196 :
197 355 : foreach(lc, table_infos)
198 : {
199 292 : Oid relid = ((published_rel *) lfirst(lc))->relid;
200 :
201 292 : if (relid == ancestor)
202 44 : return true;
203 : }
204 :
205 63 : return false;
206 : }
207 :
208 : /*
209 : * Filter out the partitions whose parent tables are also present in the list.
210 : */
211 : static void
212 185 : filter_partitions(List *table_infos)
213 : {
214 : ListCell *lc;
215 :
216 539 : foreach(lc, table_infos)
217 : {
218 354 : bool skip = false;
219 354 : List *ancestors = NIL;
220 : ListCell *lc2;
221 354 : published_rel *table_info = (published_rel *) lfirst(lc);
222 :
223 354 : if (get_rel_relispartition(table_info->relid))
224 107 : ancestors = get_partition_ancestors(table_info->relid);
225 :
226 417 : foreach(lc2, ancestors)
227 : {
228 107 : Oid ancestor = lfirst_oid(lc2);
229 :
230 107 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
231 : {
232 44 : skip = true;
233 44 : break;
234 : }
235 : }
236 :
237 354 : if (skip)
238 44 : table_infos = foreach_delete_current(table_infos, lc);
239 : }
240 185 : }
241 :
242 : /*
243 : * Returns true if any schema is associated with the publication, false if no
244 : * schema is associated with the publication.
245 : */
246 : bool
247 211 : is_schema_publication(Oid pubid)
248 : {
249 : Relation pubschsrel;
250 : ScanKeyData scankey;
251 : SysScanDesc scan;
252 : HeapTuple tup;
253 211 : bool result = false;
254 :
255 211 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
256 211 : ScanKeyInit(&scankey,
257 : Anum_pg_publication_namespace_pnpubid,
258 : BTEqualStrategyNumber, F_OIDEQ,
259 : ObjectIdGetDatum(pubid));
260 :
261 211 : scan = systable_beginscan(pubschsrel,
262 : PublicationNamespacePnnspidPnpubidIndexId,
263 : true, NULL, 1, &scankey);
264 211 : tup = systable_getnext(scan);
265 211 : result = HeapTupleIsValid(tup);
266 :
267 211 : systable_endscan(scan);
268 211 : table_close(pubschsrel, AccessShareLock);
269 :
270 211 : return result;
271 : }
272 :
273 : /*
274 : * Returns true if the publication has explicitly included relation (i.e.,
275 : * not marked as EXCEPT).
276 : */
277 : bool
278 45 : is_table_publication(Oid pubid)
279 : {
280 : Relation pubrelsrel;
281 : ScanKeyData scankey;
282 : SysScanDesc scan;
283 : HeapTuple tup;
284 45 : bool result = false;
285 :
286 45 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
287 45 : ScanKeyInit(&scankey,
288 : Anum_pg_publication_rel_prpubid,
289 : BTEqualStrategyNumber, F_OIDEQ,
290 : ObjectIdGetDatum(pubid));
291 :
292 45 : scan = systable_beginscan(pubrelsrel,
293 : PublicationRelPrpubidIndexId,
294 : true, NULL, 1, &scankey);
295 45 : tup = systable_getnext(scan);
296 45 : if (HeapTupleIsValid(tup))
297 : {
298 : Form_pg_publication_rel pubrel;
299 :
300 21 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
301 :
302 : /*
303 : * For any publication, pg_publication_rel contains either only EXCEPT
304 : * entries or only explicitly included tables. Therefore, examining
305 : * the first tuple is sufficient to determine table inclusion.
306 : */
307 21 : result = !pubrel->prexcept;
308 : }
309 :
310 45 : systable_endscan(scan);
311 45 : table_close(pubrelsrel, AccessShareLock);
312 :
313 45 : return result;
314 : }
315 :
316 : /*
317 : * Returns true if the relation has column list associated with the
318 : * publication, false otherwise.
319 : *
320 : * If a column list is found, the corresponding bitmap is returned through the
321 : * cols parameter, if provided. The bitmap is constructed within the given
322 : * memory context (mcxt).
323 : */
324 : bool
325 917 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
326 : Bitmapset **cols)
327 : {
328 : HeapTuple cftuple;
329 917 : bool found = false;
330 :
331 917 : if (pub->alltables)
332 206 : return false;
333 :
334 711 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
335 : ObjectIdGetDatum(relid),
336 : ObjectIdGetDatum(pub->oid));
337 711 : if (HeapTupleIsValid(cftuple))
338 : {
339 : Datum cfdatum;
340 : bool isnull;
341 :
342 : /* Lookup the column list attribute. */
343 653 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
344 : Anum_pg_publication_rel_prattrs, &isnull);
345 :
346 : /* Was a column list found? */
347 653 : if (!isnull)
348 : {
349 : /* Build the column list bitmap in the given memory context. */
350 193 : if (cols)
351 190 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
352 :
353 193 : found = true;
354 : }
355 :
356 653 : ReleaseSysCache(cftuple);
357 : }
358 :
359 711 : return found;
360 : }
361 :
362 : /*
363 : * Gets the relations based on the publication partition option for a specified
364 : * relation.
365 : */
366 : List *
367 3503 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
368 : Oid relid)
369 : {
370 3503 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
371 : pub_partopt != PUBLICATION_PART_ROOT)
372 755 : {
373 755 : List *all_parts = find_all_inheritors(relid, NoLock,
374 : NULL);
375 :
376 755 : if (pub_partopt == PUBLICATION_PART_ALL)
377 650 : result = list_concat(result, all_parts);
378 105 : else if (pub_partopt == PUBLICATION_PART_LEAF)
379 : {
380 : ListCell *lc;
381 :
382 387 : foreach(lc, all_parts)
383 : {
384 282 : Oid partOid = lfirst_oid(lc);
385 :
386 282 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
387 174 : result = lappend_oid(result, partOid);
388 : }
389 : }
390 : else
391 : Assert(false);
392 : }
393 : else
394 2748 : result = lappend_oid(result, relid);
395 :
396 3503 : return result;
397 : }
398 :
399 : /*
400 : * Returns the relid of the topmost ancestor that is published via this
401 : * publication if any and set its ancestor level to ancestor_level,
402 : * otherwise returns InvalidOid.
403 : *
404 : * The ancestor_level value allows us to compare the results for multiple
405 : * publications, and decide which value is higher up.
406 : *
407 : * Note that the list of ancestors should be ordered such that the topmost
408 : * ancestor is at the end of the list.
409 : */
410 : Oid
411 286 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
412 : {
413 : ListCell *lc;
414 286 : Oid topmost_relid = InvalidOid;
415 286 : int level = 0;
416 :
417 : /*
418 : * Find the "topmost" ancestor that is in this publication.
419 : */
420 580 : foreach(lc, ancestors)
421 : {
422 294 : Oid ancestor = lfirst_oid(lc);
423 294 : List *apubids = GetRelationIncludedPublications(ancestor);
424 294 : List *aschemaPubids = NIL;
425 :
426 294 : level++;
427 :
428 294 : if (list_member_oid(apubids, puboid))
429 : {
430 187 : topmost_relid = ancestor;
431 :
432 187 : if (ancestor_level)
433 43 : *ancestor_level = level;
434 : }
435 : else
436 : {
437 107 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
438 107 : if (list_member_oid(aschemaPubids, puboid))
439 : {
440 5 : topmost_relid = ancestor;
441 :
442 5 : if (ancestor_level)
443 5 : *ancestor_level = level;
444 : }
445 : }
446 :
447 294 : list_free(apubids);
448 294 : list_free(aschemaPubids);
449 : }
450 :
451 286 : return topmost_relid;
452 : }
453 :
454 : /*
455 : * attnumstoint2vector
456 : * Convert a Bitmapset of AttrNumbers into an int2vector.
457 : *
458 : * AttrNumber numbers are 0-based, i.e., not offset by
459 : * FirstLowInvalidHeapAttributeNumber.
460 : */
461 : static int2vector *
462 208 : attnumstoint2vector(Bitmapset *attrs)
463 : {
464 : int2vector *result;
465 208 : int n = bms_num_members(attrs);
466 208 : int i = -1;
467 208 : int j = 0;
468 :
469 208 : result = buildint2vector(NULL, n);
470 :
471 565 : while ((i = bms_next_member(attrs, i)) >= 0)
472 : {
473 : Assert(i <= PG_INT16_MAX);
474 :
475 357 : result->values[j++] = (int16) i;
476 : }
477 :
478 208 : return result;
479 : }
480 :
481 : /*
482 : * Insert new publication / relation mapping.
483 : */
484 : ObjectAddress
485 772 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
486 : bool if_not_exists, AlterPublicationStmt *alter_stmt)
487 : {
488 : Relation rel;
489 : HeapTuple tup;
490 : Datum values[Natts_pg_publication_rel];
491 : bool nulls[Natts_pg_publication_rel];
492 772 : Relation targetrel = pri->relation;
493 772 : Oid relid = RelationGetRelid(targetrel);
494 : Oid pubreloid;
495 : Bitmapset *attnums;
496 772 : Publication *pub = GetPublication(pubid);
497 : ObjectAddress myself,
498 : referenced;
499 772 : List *relids = NIL;
500 : int i;
501 : bool inval_except_table;
502 :
503 772 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
504 :
505 : /*
506 : * Check for duplicates. Note that this does not really prevent
507 : * duplicates, it's here just to provide nicer error message in common
508 : * case. The real protection is the unique key on the catalog.
509 : */
510 772 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
511 : ObjectIdGetDatum(pubid)))
512 : {
513 18 : table_close(rel, RowExclusiveLock);
514 :
515 18 : if (if_not_exists)
516 14 : return InvalidObjectAddress;
517 :
518 4 : ereport(ERROR,
519 : (errcode(ERRCODE_DUPLICATE_OBJECT),
520 : errmsg("relation \"%s\" is already member of publication \"%s\"",
521 : RelationGetRelationName(targetrel), pub->name)));
522 : }
523 :
524 754 : check_publication_add_relation(pri);
525 :
526 : /* Validate and translate column names into a Bitmapset of attnums. */
527 729 : attnums = pub_collist_validate(pri->relation, pri->columns);
528 :
529 : /* Form a tuple. */
530 713 : memset(values, 0, sizeof(values));
531 713 : memset(nulls, false, sizeof(nulls));
532 :
533 713 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
534 : Anum_pg_publication_rel_oid);
535 713 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
536 713 : values[Anum_pg_publication_rel_prpubid - 1] =
537 713 : ObjectIdGetDatum(pubid);
538 713 : values[Anum_pg_publication_rel_prrelid - 1] =
539 713 : ObjectIdGetDatum(relid);
540 713 : values[Anum_pg_publication_rel_prexcept - 1] =
541 713 : BoolGetDatum(pri->except);
542 :
543 : /* Add qualifications, if available */
544 713 : if (pri->whereClause != NULL)
545 211 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
546 : else
547 502 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
548 :
549 : /* Add column list, if available */
550 713 : if (pri->columns)
551 208 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
552 : else
553 505 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
554 :
555 713 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
556 :
557 : /* Insert tuple into catalog. */
558 713 : CatalogTupleInsert(rel, tup);
559 713 : heap_freetuple(tup);
560 :
561 : /* Register dependencies as needed */
562 713 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
563 :
564 : /* Add dependency on the publication */
565 713 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
566 713 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
567 :
568 : /* Add dependency on the relation */
569 713 : ObjectAddressSet(referenced, RelationRelationId, relid);
570 713 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
571 :
572 : /* Add dependency on the objects mentioned in the qualifications */
573 713 : if (pri->whereClause)
574 211 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
575 : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
576 : false);
577 :
578 : /* Add dependency on the columns, if any are listed */
579 713 : i = -1;
580 1070 : while ((i = bms_next_member(attnums, i)) >= 0)
581 : {
582 357 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
583 357 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
584 : }
585 :
586 : /* Close the table. */
587 713 : table_close(rel, RowExclusiveLock);
588 :
589 : /*
590 : * Determine whether EXCEPT tables require explicit relcache invalidation.
591 : *
592 : * For CREATE PUBLICATION with EXCEPT tables, invalidation is skipped
593 : * here, as CreatePublication() function invalidates all relations as part
594 : * of defining a FOR ALL TABLES publication.
595 : *
596 : * For ALTER PUBLICATION, invalidation is needed only when adding an
597 : * EXCEPT table to a publication already marked as ALL TABLES. For
598 : * publications that were originally empty or defined as ALL SEQUENCES and
599 : * are being converted to ALL TABLES, invalidation is skipped here, as
600 : * AlterPublicationAllFlags() function invalidates all relations while
601 : * marking the publication as ALL TABLES publication.
602 : */
603 714 : inval_except_table = (alter_stmt != NULL) && pub->alltables &&
604 1 : (alter_stmt->for_all_tables && pri->except);
605 :
606 713 : if (!pri->except || inval_except_table)
607 : {
608 : /*
609 : * Invalidate relcache so that publication info is rebuilt.
610 : *
611 : * For the partitioned tables, we must invalidate all partitions
612 : * contained in the respective partition hierarchies, not just the one
613 : * explicitly mentioned in the publication. This is required because
614 : * we implicitly publish the child tables when the parent table is
615 : * published.
616 : */
617 665 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
618 : relid);
619 :
620 665 : InvalidatePublicationRels(relids);
621 : }
622 :
623 713 : return myself;
624 : }
625 :
626 : /*
627 : * pub_collist_validate
628 : * Process and validate the 'columns' list and ensure the columns are all
629 : * valid to use for a publication. Checks for and raises an ERROR for
630 : * any unknown columns, system columns, duplicate columns, or virtual
631 : * generated columns.
632 : *
633 : * Looks up each column's attnum and returns a 0-based Bitmapset of the
634 : * corresponding attnums.
635 : */
636 : Bitmapset *
637 1007 : pub_collist_validate(Relation targetrel, List *columns)
638 : {
639 1007 : Bitmapset *set = NULL;
640 : ListCell *lc;
641 1007 : TupleDesc tupdesc = RelationGetDescr(targetrel);
642 :
643 1544 : foreach(lc, columns)
644 : {
645 561 : char *colname = strVal(lfirst(lc));
646 561 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
647 :
648 561 : if (attnum == InvalidAttrNumber)
649 4 : ereport(ERROR,
650 : errcode(ERRCODE_UNDEFINED_COLUMN),
651 : errmsg("column \"%s\" of relation \"%s\" does not exist",
652 : colname, RelationGetRelationName(targetrel)));
653 :
654 557 : if (!AttrNumberIsForUserDefinedAttr(attnum))
655 8 : ereport(ERROR,
656 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
657 : errmsg("cannot use system column \"%s\" in publication column list",
658 : colname));
659 :
660 549 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
661 4 : ereport(ERROR,
662 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
663 : errmsg("cannot use virtual generated column \"%s\" in publication column list",
664 : colname));
665 :
666 545 : if (bms_is_member(attnum, set))
667 8 : ereport(ERROR,
668 : errcode(ERRCODE_DUPLICATE_OBJECT),
669 : errmsg("duplicate column \"%s\" in publication column list",
670 : colname));
671 :
672 537 : set = bms_add_member(set, attnum);
673 : }
674 :
675 983 : return set;
676 : }
677 :
678 : /*
679 : * Transform a column list (represented by an array Datum) to a bitmapset.
680 : *
681 : * If columns isn't NULL, add the column numbers to that set.
682 : *
683 : * If mcxt isn't NULL, build the bitmapset in that context.
684 : */
685 : Bitmapset *
686 280 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
687 : {
688 280 : Bitmapset *result = columns;
689 : ArrayType *arr;
690 : int nelems;
691 : int16 *elems;
692 280 : MemoryContext oldcxt = NULL;
693 :
694 280 : arr = DatumGetArrayTypeP(pubcols);
695 280 : nelems = ARR_DIMS(arr)[0];
696 280 : elems = (int16 *) ARR_DATA_PTR(arr);
697 :
698 : /* If a memory context was specified, switch to it. */
699 280 : if (mcxt)
700 41 : oldcxt = MemoryContextSwitchTo(mcxt);
701 :
702 770 : for (int i = 0; i < nelems; i++)
703 490 : result = bms_add_member(result, elems[i]);
704 :
705 280 : if (mcxt)
706 41 : MemoryContextSwitchTo(oldcxt);
707 :
708 280 : return result;
709 : }
710 :
711 : /*
712 : * Returns a bitmap representing the columns of the specified table.
713 : *
714 : * Generated columns are included if include_gencols_type is
715 : * PUBLISH_GENCOLS_STORED.
716 : */
717 : Bitmapset *
718 9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
719 : {
720 9 : Bitmapset *result = NULL;
721 9 : TupleDesc desc = RelationGetDescr(relation);
722 :
723 30 : for (int i = 0; i < desc->natts; i++)
724 : {
725 21 : Form_pg_attribute att = TupleDescAttr(desc, i);
726 :
727 21 : if (att->attisdropped)
728 1 : continue;
729 :
730 20 : if (att->attgenerated)
731 : {
732 : /* We only support replication of STORED generated cols. */
733 2 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
734 1 : continue;
735 :
736 : /* User hasn't requested to replicate STORED generated cols. */
737 1 : if (include_gencols_type != PUBLISH_GENCOLS_STORED)
738 1 : continue;
739 : }
740 :
741 18 : result = bms_add_member(result, att->attnum);
742 : }
743 :
744 9 : return result;
745 : }
746 :
747 : /*
748 : * Insert new publication / schema mapping.
749 : */
750 : ObjectAddress
751 171 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
752 : {
753 : Relation rel;
754 : HeapTuple tup;
755 : Datum values[Natts_pg_publication_namespace];
756 : bool nulls[Natts_pg_publication_namespace];
757 : Oid psschid;
758 171 : Publication *pub = GetPublication(pubid);
759 171 : List *schemaRels = NIL;
760 : ObjectAddress myself,
761 : referenced;
762 :
763 171 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
764 :
765 : /*
766 : * Check for duplicates. Note that this does not really prevent
767 : * duplicates, it's here just to provide nicer error message in common
768 : * case. The real protection is the unique key on the catalog.
769 : */
770 171 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
771 : ObjectIdGetDatum(schemaid),
772 : ObjectIdGetDatum(pubid)))
773 : {
774 12 : table_close(rel, RowExclusiveLock);
775 :
776 12 : if (if_not_exists)
777 8 : return InvalidObjectAddress;
778 :
779 4 : ereport(ERROR,
780 : (errcode(ERRCODE_DUPLICATE_OBJECT),
781 : errmsg("schema \"%s\" is already member of publication \"%s\"",
782 : get_namespace_name(schemaid), pub->name)));
783 : }
784 :
785 159 : check_publication_add_schema(schemaid);
786 :
787 : /* Form a tuple */
788 155 : memset(values, 0, sizeof(values));
789 155 : memset(nulls, false, sizeof(nulls));
790 :
791 155 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
792 : Anum_pg_publication_namespace_oid);
793 155 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
794 155 : values[Anum_pg_publication_namespace_pnpubid - 1] =
795 155 : ObjectIdGetDatum(pubid);
796 155 : values[Anum_pg_publication_namespace_pnnspid - 1] =
797 155 : ObjectIdGetDatum(schemaid);
798 :
799 155 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
800 :
801 : /* Insert tuple into catalog */
802 155 : CatalogTupleInsert(rel, tup);
803 155 : heap_freetuple(tup);
804 :
805 155 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
806 :
807 : /* Add dependency on the publication */
808 155 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
809 155 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
810 :
811 : /* Add dependency on the schema */
812 155 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
813 155 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
814 :
815 : /* Close the table */
816 155 : table_close(rel, RowExclusiveLock);
817 :
818 : /*
819 : * Invalidate relcache so that publication info is rebuilt. See
820 : * publication_add_relation for why we need to consider all the
821 : * partitions.
822 : */
823 155 : schemaRels = GetSchemaPublicationRelations(schemaid,
824 : PUBLICATION_PART_ALL);
825 155 : InvalidatePublicationRels(schemaRels);
826 :
827 155 : return myself;
828 : }
829 :
830 : /*
831 : * Internal function to get the list of publication oids for a relation.
832 : *
833 : * If except_flag is true, returns the list of publication that specified the
834 : * relation in EXCEPT clause; otherwise, returns the list of publications
835 : * in which relation is included.
836 : */
837 : static List *
838 15539 : get_relation_publications(Oid relid, bool except_flag)
839 : {
840 15539 : List *result = NIL;
841 : CatCList *pubrellist;
842 :
843 : /* Find all publications associated with the relation. */
844 15539 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
845 : ObjectIdGetDatum(relid));
846 16993 : for (int i = 0; i < pubrellist->n_members; i++)
847 : {
848 1454 : HeapTuple tup = &pubrellist->members[i]->tuple;
849 1454 : Form_pg_publication_rel pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
850 1454 : Oid pubid = pubrel->prpubid;
851 :
852 1454 : if (pubrel->prexcept == except_flag)
853 1020 : result = lappend_oid(result, pubid);
854 : }
855 :
856 15539 : ReleaseSysCacheList(pubrellist);
857 :
858 15539 : return result;
859 : }
860 :
861 : /*
862 : * Gets list of publication oids for a relation.
863 : */
864 : List *
865 8335 : GetRelationIncludedPublications(Oid relid)
866 : {
867 8335 : return get_relation_publications(relid, false);
868 : }
869 :
870 : /*
871 : * Gets list of publication oids which has relation in EXCEPT clause.
872 : */
873 : List *
874 7204 : GetRelationExcludedPublications(Oid relid)
875 : {
876 7204 : return get_relation_publications(relid, true);
877 : }
878 :
879 : /*
880 : * Internal function to get the list of relation oids for a publication.
881 : *
882 : * If except_flag is true, returns the list of relations specified in the
883 : * EXCEPT clause of the publication; otherwise, returns the list of relations
884 : * included in the publication.
885 : */
886 : static List *
887 1539 : get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt,
888 : bool except_flag)
889 : {
890 : List *result;
891 : Relation pubrelsrel;
892 : ScanKeyData scankey;
893 : SysScanDesc scan;
894 : HeapTuple tup;
895 :
896 : /* Find all relations associated with the publication. */
897 1539 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
898 :
899 1539 : ScanKeyInit(&scankey,
900 : Anum_pg_publication_rel_prpubid,
901 : BTEqualStrategyNumber, F_OIDEQ,
902 : ObjectIdGetDatum(pubid));
903 :
904 1539 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
905 : true, NULL, 1, &scankey);
906 :
907 1539 : result = NIL;
908 4793 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
909 : {
910 : Form_pg_publication_rel pubrel;
911 :
912 1715 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
913 :
914 1715 : if (except_flag == pubrel->prexcept)
915 1715 : result = GetPubPartitionOptionRelations(result, pub_partopt,
916 : pubrel->prrelid);
917 : }
918 :
919 1539 : systable_endscan(scan);
920 1539 : table_close(pubrelsrel, AccessShareLock);
921 :
922 : /* Now sort and de-duplicate the result list */
923 1539 : list_sort(result, list_oid_cmp);
924 1539 : list_deduplicate_oid(result);
925 :
926 1539 : return result;
927 : }
928 :
929 : /*
930 : * Gets list of relation oids that are associated with a publication.
931 : *
932 : * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
933 : * should use GetAllPublicationRelations().
934 : */
935 : List *
936 1319 : GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
937 : {
938 : Assert(!GetPublication(pubid)->alltables);
939 :
940 1319 : return get_publication_relations(pubid, pub_partopt, false);
941 : }
942 :
943 : /*
944 : * Gets list of table oids that were specified in the EXCEPT clause for a
945 : * publication.
946 : *
947 : * This should only be used FOR ALL TABLES publications.
948 : */
949 : List *
950 220 : GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
951 : {
952 : Assert(GetPublication(pubid)->alltables);
953 :
954 220 : return get_publication_relations(pubid, pub_partopt, true);
955 : }
956 :
957 : /*
958 : * Gets list of publication oids for publications marked as FOR ALL TABLES.
959 : */
960 : List *
961 5518 : GetAllTablesPublications(void)
962 : {
963 : List *result;
964 : Relation rel;
965 : ScanKeyData scankey;
966 : SysScanDesc scan;
967 : HeapTuple tup;
968 :
969 : /* Find all publications that are marked as for all tables. */
970 5518 : rel = table_open(PublicationRelationId, AccessShareLock);
971 :
972 5518 : ScanKeyInit(&scankey,
973 : Anum_pg_publication_puballtables,
974 : BTEqualStrategyNumber, F_BOOLEQ,
975 : BoolGetDatum(true));
976 :
977 5518 : scan = systable_beginscan(rel, InvalidOid, false,
978 : NULL, 1, &scankey);
979 :
980 5518 : result = NIL;
981 5631 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
982 : {
983 113 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
984 :
985 113 : result = lappend_oid(result, oid);
986 : }
987 :
988 5518 : systable_endscan(scan);
989 5518 : table_close(rel, AccessShareLock);
990 :
991 5518 : return result;
992 : }
993 :
994 : /*
995 : * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
996 : * publication.
997 : *
998 : * If the publication publishes partition changes via their respective root
999 : * partitioned tables, we must exclude partitions in favor of including the
1000 : * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
1001 : * publication.
1002 : *
1003 : * For a FOR ALL TABLES publication, the returned list excludes tables mentioned
1004 : * in EXCEPT TABLE clause.
1005 : */
1006 : List *
1007 209 : GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
1008 : {
1009 : Relation classRel;
1010 : ScanKeyData key[1];
1011 : TableScanDesc scan;
1012 : HeapTuple tuple;
1013 209 : List *result = NIL;
1014 209 : List *exceptlist = NIL;
1015 :
1016 : Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
1017 :
1018 : /* EXCEPT filtering applies only to relations, not sequences */
1019 209 : if (relkind == RELKIND_RELATION)
1020 203 : exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ?
1021 203 : PUBLICATION_PART_ROOT :
1022 : PUBLICATION_PART_LEAF);
1023 :
1024 209 : classRel = table_open(RelationRelationId, AccessShareLock);
1025 :
1026 209 : ScanKeyInit(&key[0],
1027 : Anum_pg_class_relkind,
1028 : BTEqualStrategyNumber, F_CHAREQ,
1029 : CharGetDatum(relkind));
1030 :
1031 209 : scan = table_beginscan_catalog(classRel, 1, key);
1032 :
1033 15963 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1034 : {
1035 15754 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1036 15754 : Oid relid = relForm->oid;
1037 :
1038 15754 : if (is_publishable_class(relid, relForm) &&
1039 929 : !(relForm->relispartition && pubviaroot) &&
1040 832 : !list_member_oid(exceptlist, relid))
1041 798 : result = lappend_oid(result, relid);
1042 : }
1043 :
1044 209 : table_endscan(scan);
1045 :
1046 209 : if (pubviaroot)
1047 : {
1048 16 : ScanKeyInit(&key[0],
1049 : Anum_pg_class_relkind,
1050 : BTEqualStrategyNumber, F_CHAREQ,
1051 : CharGetDatum(RELKIND_PARTITIONED_TABLE));
1052 :
1053 16 : scan = table_beginscan_catalog(classRel, 1, key);
1054 :
1055 87 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1056 : {
1057 71 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1058 71 : Oid relid = relForm->oid;
1059 :
1060 71 : if (is_publishable_class(relid, relForm) &&
1061 71 : !relForm->relispartition &&
1062 55 : !list_member_oid(exceptlist, relid))
1063 52 : result = lappend_oid(result, relid);
1064 : }
1065 :
1066 16 : table_endscan(scan);
1067 : }
1068 :
1069 209 : table_close(classRel, AccessShareLock);
1070 209 : return result;
1071 : }
1072 :
1073 : /*
1074 : * Gets the list of schema oids for a publication.
1075 : *
1076 : * This should only be used FOR TABLES IN SCHEMA publications.
1077 : */
1078 : List *
1079 1281 : GetPublicationSchemas(Oid pubid)
1080 : {
1081 1281 : List *result = NIL;
1082 : Relation pubschsrel;
1083 : ScanKeyData scankey;
1084 : SysScanDesc scan;
1085 : HeapTuple tup;
1086 :
1087 : /* Find all schemas associated with the publication */
1088 1281 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
1089 :
1090 1281 : ScanKeyInit(&scankey,
1091 : Anum_pg_publication_namespace_pnpubid,
1092 : BTEqualStrategyNumber, F_OIDEQ,
1093 : ObjectIdGetDatum(pubid));
1094 :
1095 1281 : scan = systable_beginscan(pubschsrel,
1096 : PublicationNamespacePnnspidPnpubidIndexId,
1097 : true, NULL, 1, &scankey);
1098 1339 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
1099 : {
1100 : Form_pg_publication_namespace pubsch;
1101 :
1102 58 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1103 :
1104 58 : result = lappend_oid(result, pubsch->pnnspid);
1105 : }
1106 :
1107 1281 : systable_endscan(scan);
1108 1281 : table_close(pubschsrel, AccessShareLock);
1109 :
1110 1281 : return result;
1111 : }
1112 :
1113 : /*
1114 : * Gets the list of publication oids associated with a specified schema.
1115 : */
1116 : List *
1117 8039 : GetSchemaPublications(Oid schemaid)
1118 : {
1119 8039 : List *result = NIL;
1120 : CatCList *pubschlist;
1121 : int i;
1122 :
1123 : /* Find all publications associated with the schema */
1124 8039 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
1125 : ObjectIdGetDatum(schemaid));
1126 8103 : for (i = 0; i < pubschlist->n_members; i++)
1127 : {
1128 64 : HeapTuple tup = &pubschlist->members[i]->tuple;
1129 64 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
1130 :
1131 64 : result = lappend_oid(result, pubid);
1132 : }
1133 :
1134 8039 : ReleaseSysCacheList(pubschlist);
1135 :
1136 8039 : return result;
1137 : }
1138 :
1139 : /*
1140 : * Get the list of publishable relation oids for a specified schema.
1141 : */
1142 : List *
1143 320 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
1144 : {
1145 : Relation classRel;
1146 : ScanKeyData key[1];
1147 : TableScanDesc scan;
1148 : HeapTuple tuple;
1149 320 : List *result = NIL;
1150 :
1151 : Assert(OidIsValid(schemaid));
1152 :
1153 320 : classRel = table_open(RelationRelationId, AccessShareLock);
1154 :
1155 320 : ScanKeyInit(&key[0],
1156 : Anum_pg_class_relnamespace,
1157 : BTEqualStrategyNumber, F_OIDEQ,
1158 : ObjectIdGetDatum(schemaid));
1159 :
1160 : /* get all the relations present in the specified schema */
1161 320 : scan = table_beginscan_catalog(classRel, 1, key);
1162 17194 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1163 : {
1164 16874 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1165 16874 : Oid relid = relForm->oid;
1166 : char relkind;
1167 :
1168 16874 : if (!is_publishable_class(relid, relForm))
1169 5795 : continue;
1170 :
1171 11079 : relkind = get_rel_relkind(relid);
1172 11079 : if (relkind == RELKIND_RELATION)
1173 9510 : result = lappend_oid(result, relid);
1174 1569 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1175 : {
1176 511 : List *partitionrels = NIL;
1177 :
1178 : /*
1179 : * It is quite possible that some of the partitions are in a
1180 : * different schema than the parent table, so we need to get such
1181 : * partitions separately.
1182 : */
1183 511 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1184 : pub_partopt,
1185 : relForm->oid);
1186 511 : result = list_concat_unique_oid(result, partitionrels);
1187 : }
1188 : }
1189 :
1190 320 : table_endscan(scan);
1191 320 : table_close(classRel, AccessShareLock);
1192 320 : return result;
1193 : }
1194 :
1195 : /*
1196 : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1197 : * publication.
1198 : */
1199 : List *
1200 992 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1201 : {
1202 992 : List *result = NIL;
1203 992 : List *pubschemalist = GetPublicationSchemas(pubid);
1204 : ListCell *cell;
1205 :
1206 1030 : foreach(cell, pubschemalist)
1207 : {
1208 38 : Oid schemaid = lfirst_oid(cell);
1209 38 : List *schemaRels = NIL;
1210 :
1211 38 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1212 38 : result = list_concat(result, schemaRels);
1213 : }
1214 :
1215 992 : return result;
1216 : }
1217 :
1218 : /*
1219 : * Get publication using oid
1220 : *
1221 : * The Publication struct and its data are palloc'ed here.
1222 : */
1223 : Publication *
1224 5330 : GetPublication(Oid pubid)
1225 : {
1226 : HeapTuple tup;
1227 : Publication *pub;
1228 : Form_pg_publication pubform;
1229 :
1230 5330 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1231 5330 : if (!HeapTupleIsValid(tup))
1232 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1233 :
1234 5330 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1235 :
1236 5330 : pub = palloc_object(Publication);
1237 5330 : pub->oid = pubid;
1238 5330 : pub->name = pstrdup(NameStr(pubform->pubname));
1239 5330 : pub->alltables = pubform->puballtables;
1240 5330 : pub->allsequences = pubform->puballsequences;
1241 5330 : pub->pubactions.pubinsert = pubform->pubinsert;
1242 5330 : pub->pubactions.pubupdate = pubform->pubupdate;
1243 5330 : pub->pubactions.pubdelete = pubform->pubdelete;
1244 5330 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1245 5330 : pub->pubviaroot = pubform->pubviaroot;
1246 5330 : pub->pubgencols_type = pubform->pubgencols;
1247 :
1248 5330 : ReleaseSysCache(tup);
1249 :
1250 5330 : return pub;
1251 : }
1252 :
1253 : /*
1254 : * Get Publication using name.
1255 : */
1256 : Publication *
1257 1661 : GetPublicationByName(const char *pubname, bool missing_ok)
1258 : {
1259 : Oid oid;
1260 :
1261 1661 : oid = get_publication_oid(pubname, missing_ok);
1262 :
1263 1660 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1264 : }
1265 :
1266 : /*
1267 : * Get information of the tables in the given publication array.
1268 : *
1269 : * Returns pubid, relid, column list, row filter for each table.
1270 : */
1271 : Datum
1272 3251 : pg_get_publication_tables(PG_FUNCTION_ARGS)
1273 : {
1274 : #define NUM_PUBLICATION_TABLES_ELEM 4
1275 : FuncCallContext *funcctx;
1276 3251 : List *table_infos = NIL;
1277 :
1278 : /* stuff done only on the first call of the function */
1279 3251 : if (SRF_IS_FIRSTCALL())
1280 : {
1281 : TupleDesc tupdesc;
1282 : MemoryContext oldcontext;
1283 : ArrayType *arr;
1284 : Datum *elems;
1285 : int nelems,
1286 : i;
1287 1068 : bool viaroot = false;
1288 :
1289 : /* create a function context for cross-call persistence */
1290 1068 : funcctx = SRF_FIRSTCALL_INIT();
1291 :
1292 : /* switch to memory context appropriate for multiple function calls */
1293 1068 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1294 :
1295 : /*
1296 : * Deconstruct the parameter into elements where each element is a
1297 : * publication name.
1298 : */
1299 1068 : arr = PG_GETARG_ARRAYTYPE_P(0);
1300 1068 : deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
1301 :
1302 : /* Get Oids of tables from each publication. */
1303 2183 : for (i = 0; i < nelems; i++)
1304 : {
1305 : Publication *pub_elem;
1306 1116 : List *pub_elem_tables = NIL;
1307 : ListCell *lc;
1308 :
1309 1116 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1310 :
1311 : /*
1312 : * Publications support partitioned tables. If
1313 : * publish_via_partition_root is false, all changes are replicated
1314 : * using leaf partition identity and schema, so we only need
1315 : * those. Otherwise, get the partitioned table itself.
1316 : */
1317 1115 : if (pub_elem->alltables)
1318 203 : pub_elem_tables = GetAllPublicationRelations(pub_elem->oid,
1319 : RELKIND_RELATION,
1320 203 : pub_elem->pubviaroot);
1321 : else
1322 : {
1323 : List *relids,
1324 : *schemarelids;
1325 :
1326 912 : relids = GetIncludedPublicationRelations(pub_elem->oid,
1327 912 : pub_elem->pubviaroot ?
1328 912 : PUBLICATION_PART_ROOT :
1329 : PUBLICATION_PART_LEAF);
1330 912 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1331 912 : pub_elem->pubviaroot ?
1332 912 : PUBLICATION_PART_ROOT :
1333 : PUBLICATION_PART_LEAF);
1334 912 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1335 : }
1336 :
1337 : /*
1338 : * Record the published table and the corresponding publication so
1339 : * that we can get row filters and column lists later.
1340 : *
1341 : * When a table is published by multiple publications, to obtain
1342 : * all row filters and column lists, the structure related to this
1343 : * table will be recorded multiple times.
1344 : */
1345 3342 : foreach(lc, pub_elem_tables)
1346 : {
1347 2227 : published_rel *table_info = palloc_object(published_rel);
1348 :
1349 2227 : table_info->relid = lfirst_oid(lc);
1350 2227 : table_info->pubid = pub_elem->oid;
1351 2227 : table_infos = lappend(table_infos, table_info);
1352 : }
1353 :
1354 : /* At least one publication is using publish_via_partition_root. */
1355 1115 : if (pub_elem->pubviaroot)
1356 193 : viaroot = true;
1357 : }
1358 :
1359 : /*
1360 : * If the publication publishes partition changes via their respective
1361 : * root partitioned tables, we must exclude partitions in favor of
1362 : * including the root partitioned tables. Otherwise, the function
1363 : * could return both the child and parent tables which could cause
1364 : * data of the child table to be double-published on the subscriber
1365 : * side.
1366 : */
1367 1067 : if (viaroot)
1368 185 : filter_partitions(table_infos);
1369 :
1370 : /* Construct a tuple descriptor for the result rows. */
1371 1067 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1372 1067 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1373 : OIDOID, -1, 0);
1374 1067 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1375 : OIDOID, -1, 0);
1376 1067 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1377 : INT2VECTOROID, -1, 0);
1378 1067 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1379 : PG_NODE_TREEOID, -1, 0);
1380 :
1381 1067 : TupleDescFinalize(tupdesc);
1382 1067 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1383 1067 : funcctx->user_fctx = table_infos;
1384 :
1385 1067 : MemoryContextSwitchTo(oldcontext);
1386 : }
1387 :
1388 : /* stuff done on every call of the function */
1389 3250 : funcctx = SRF_PERCALL_SETUP();
1390 3250 : table_infos = (List *) funcctx->user_fctx;
1391 :
1392 3250 : if (funcctx->call_cntr < list_length(table_infos))
1393 : {
1394 2183 : HeapTuple pubtuple = NULL;
1395 : HeapTuple rettuple;
1396 : Publication *pub;
1397 2183 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1398 2183 : Oid relid = table_info->relid;
1399 2183 : Oid schemaid = get_rel_namespace(relid);
1400 2183 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1401 2183 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1402 :
1403 : /*
1404 : * Form tuple with appropriate data.
1405 : */
1406 :
1407 2183 : pub = GetPublication(table_info->pubid);
1408 :
1409 2183 : values[0] = ObjectIdGetDatum(pub->oid);
1410 2183 : values[1] = ObjectIdGetDatum(relid);
1411 :
1412 : /*
1413 : * We don't consider row filters or column lists for FOR ALL TABLES or
1414 : * FOR TABLES IN SCHEMA publications.
1415 : */
1416 2183 : if (!pub->alltables &&
1417 1350 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1418 : ObjectIdGetDatum(schemaid),
1419 : ObjectIdGetDatum(pub->oid)))
1420 1300 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1421 : ObjectIdGetDatum(relid),
1422 : ObjectIdGetDatum(pub->oid));
1423 :
1424 2183 : if (HeapTupleIsValid(pubtuple))
1425 : {
1426 : /* Lookup the column list attribute. */
1427 1187 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1428 : Anum_pg_publication_rel_prattrs,
1429 : &(nulls[2]));
1430 :
1431 : /* Null indicates no filter. */
1432 1187 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1433 : Anum_pg_publication_rel_prqual,
1434 : &(nulls[3]));
1435 : }
1436 : else
1437 : {
1438 996 : nulls[2] = true;
1439 996 : nulls[3] = true;
1440 : }
1441 :
1442 : /* Show all columns when the column list is not specified. */
1443 2183 : if (nulls[2])
1444 : {
1445 2057 : Relation rel = table_open(relid, AccessShareLock);
1446 2057 : int nattnums = 0;
1447 : int16 *attnums;
1448 2057 : TupleDesc desc = RelationGetDescr(rel);
1449 : int i;
1450 :
1451 2057 : attnums = palloc_array(int16, desc->natts);
1452 :
1453 5596 : for (i = 0; i < desc->natts; i++)
1454 : {
1455 3539 : Form_pg_attribute att = TupleDescAttr(desc, i);
1456 :
1457 3539 : if (att->attisdropped)
1458 6 : continue;
1459 :
1460 3533 : if (att->attgenerated)
1461 : {
1462 : /* We only support replication of STORED generated cols. */
1463 48 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1464 36 : continue;
1465 :
1466 : /*
1467 : * User hasn't requested to replicate STORED generated
1468 : * cols.
1469 : */
1470 12 : if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
1471 9 : continue;
1472 : }
1473 :
1474 3488 : attnums[nattnums++] = att->attnum;
1475 : }
1476 :
1477 2057 : if (nattnums > 0)
1478 : {
1479 2035 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1480 2035 : nulls[2] = false;
1481 : }
1482 :
1483 2057 : table_close(rel, AccessShareLock);
1484 : }
1485 :
1486 2183 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1487 :
1488 2183 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1489 : }
1490 :
1491 1067 : SRF_RETURN_DONE(funcctx);
1492 : }
1493 :
1494 : /*
1495 : * Returns Oids of sequences in a publication.
1496 : */
1497 : Datum
1498 233 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
1499 : {
1500 : FuncCallContext *funcctx;
1501 233 : List *sequences = NIL;
1502 :
1503 : /* stuff done only on the first call of the function */
1504 233 : if (SRF_IS_FIRSTCALL())
1505 : {
1506 216 : char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1507 : Publication *publication;
1508 : MemoryContext oldcontext;
1509 :
1510 : /* create a function context for cross-call persistence */
1511 216 : funcctx = SRF_FIRSTCALL_INIT();
1512 :
1513 : /* switch to memory context appropriate for multiple function calls */
1514 216 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1515 :
1516 216 : publication = GetPublicationByName(pubname, false);
1517 :
1518 216 : if (publication->allsequences)
1519 6 : sequences = GetAllPublicationRelations(publication->oid,
1520 : RELKIND_SEQUENCE,
1521 : false);
1522 :
1523 216 : funcctx->user_fctx = sequences;
1524 :
1525 216 : MemoryContextSwitchTo(oldcontext);
1526 : }
1527 :
1528 : /* stuff done on every call of the function */
1529 233 : funcctx = SRF_PERCALL_SETUP();
1530 233 : sequences = (List *) funcctx->user_fctx;
1531 :
1532 233 : if (funcctx->call_cntr < list_length(sequences))
1533 : {
1534 17 : Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1535 :
1536 17 : SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
1537 : }
1538 :
1539 216 : SRF_RETURN_DONE(funcctx);
1540 : }
|