Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_publication.c
4 : * publication C API manipulation
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_publication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/catalog.h"
22 : #include "catalog/dependency.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/partition.h"
27 : #include "catalog/pg_inherits.h"
28 : #include "catalog/pg_namespace.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/publicationcmds.h"
34 : #include "funcapi.h"
35 : #include "utils/array.h"
36 : #include "utils/builtins.h"
37 : #include "utils/catcache.h"
38 : #include "utils/fmgroids.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/syscache.h"
42 :
43 : /* Records association between publication and published table */
44 : typedef struct
45 : {
46 : Oid relid; /* OID of published table */
47 : Oid pubid; /* OID of publication that publishes this
48 : * table. */
49 : } published_rel;
50 :
51 : /*
52 : * Check if relation can be in given publication and throws appropriate
53 : * error if not.
54 : */
55 : static void
56 1128 : check_publication_add_relation(Relation targetrel)
57 : {
58 : /* Must be a regular or partitioned table */
59 1128 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
60 144 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
61 14 : ereport(ERROR,
62 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
63 : errmsg("cannot add relation \"%s\" to publication",
64 : RelationGetRelationName(targetrel)),
65 : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
66 :
67 : /* Can't be system table */
68 1114 : if (IsCatalogRelation(targetrel))
69 6 : ereport(ERROR,
70 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
71 : errmsg("cannot add relation \"%s\" to publication",
72 : RelationGetRelationName(targetrel)),
73 : errdetail("This operation is not supported for system tables.")));
74 :
75 : /* UNLOGGED and TEMP relations cannot be part of publication. */
76 1108 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
77 6 : ereport(ERROR,
78 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
79 : errmsg("cannot add relation \"%s\" to publication",
80 : RelationGetRelationName(targetrel)),
81 : errdetail("This operation is not supported for temporary tables.")));
82 1102 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
83 6 : ereport(ERROR,
84 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : errmsg("cannot add relation \"%s\" to publication",
86 : RelationGetRelationName(targetrel)),
87 : errdetail("This operation is not supported for unlogged tables.")));
88 1096 : }
89 :
90 : /*
91 : * Check if schema can be in given publication and throw appropriate error if
92 : * not.
93 : */
94 : static void
95 244 : check_publication_add_schema(Oid schemaid)
96 : {
97 : /* Can't be system namespace */
98 244 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
99 6 : ereport(ERROR,
100 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
101 : errmsg("cannot add schema \"%s\" to publication",
102 : get_namespace_name(schemaid)),
103 : errdetail("This operation is not supported for system schemas.")));
104 :
105 : /* Can't be temporary namespace */
106 238 : if (isAnyTempNamespace(schemaid))
107 0 : ereport(ERROR,
108 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
109 : errmsg("cannot add schema \"%s\" to publication",
110 : get_namespace_name(schemaid)),
111 : errdetail("Temporary schemas cannot be replicated.")));
112 238 : }
113 :
114 : /*
115 : * Returns if relation represented by oid and Form_pg_class entry
116 : * is publishable.
117 : *
118 : * Does same checks as check_publication_add_relation() above except for
119 : * RELKIND_SEQUENCE, but does not need relation to be opened and also does
120 : * not throw errors. Here, the additional check is to support ALL SEQUENCES
121 : * publication.
122 : *
123 : * XXX This also excludes all tables with relid < FirstNormalObjectId,
124 : * ie all tables created during initdb. This mainly affects the preinstalled
125 : * information_schema. IsCatalogRelationOid() only excludes tables with
126 : * relid < FirstUnpinnedObjectId, making that test rather redundant,
127 : * but really we should get rid of the FirstNormalObjectId test not
128 : * IsCatalogRelationOid. We can't do so today because we don't want
129 : * information_schema tables to be considered publishable; but this test
130 : * is really inadequate for that, since the information_schema could be
131 : * dropped and reloaded and then it'll be considered publishable. The best
132 : * long-term solution may be to add a "relispublishable" bool to pg_class,
133 : * and depend on that instead of OID checks.
134 : */
135 : static bool
136 599306 : is_publishable_class(Oid relid, Form_pg_class reltuple)
137 : {
138 611594 : return (reltuple->relkind == RELKIND_RELATION ||
139 12288 : reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
140 10784 : reltuple->relkind == RELKIND_SEQUENCE) &&
141 590554 : !IsCatalogRelationOid(relid) &&
142 1198612 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
143 : relid >= FirstNormalObjectId;
144 : }
145 :
146 : /*
147 : * Another variant of is_publishable_class(), taking a Relation.
148 : */
149 : bool
150 542690 : is_publishable_relation(Relation rel)
151 : {
152 542690 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
153 : }
154 :
155 : /*
156 : * SQL-callable variant of the above
157 : *
158 : * This returns null when the relation does not exist. This is intended to be
159 : * used for example in psql to avoid gratuitous errors when there are
160 : * concurrent catalog changes.
161 : */
162 : Datum
163 6260 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
164 : {
165 6260 : Oid relid = PG_GETARG_OID(0);
166 : HeapTuple tuple;
167 : bool result;
168 :
169 6260 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
170 6260 : if (!HeapTupleIsValid(tuple))
171 0 : PG_RETURN_NULL();
172 6260 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
173 6260 : ReleaseSysCache(tuple);
174 6260 : PG_RETURN_BOOL(result);
175 : }
176 :
177 : /*
178 : * Returns true if the ancestor is in the list of published relations.
179 : * Otherwise, returns false.
180 : */
181 : static bool
182 202 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
183 : {
184 : ListCell *lc;
185 :
186 694 : foreach(lc, table_infos)
187 : {
188 572 : Oid relid = ((published_rel *) lfirst(lc))->relid;
189 :
190 572 : if (relid == ancestor)
191 80 : return true;
192 : }
193 :
194 122 : return false;
195 : }
196 :
197 : /*
198 : * Filter out the partitions whose parent tables are also present in the list.
199 : */
200 : static void
201 352 : filter_partitions(List *table_infos)
202 : {
203 : ListCell *lc;
204 :
205 1034 : foreach(lc, table_infos)
206 : {
207 682 : bool skip = false;
208 682 : List *ancestors = NIL;
209 : ListCell *lc2;
210 682 : published_rel *table_info = (published_rel *) lfirst(lc);
211 :
212 682 : if (get_rel_relispartition(table_info->relid))
213 202 : ancestors = get_partition_ancestors(table_info->relid);
214 :
215 804 : foreach(lc2, ancestors)
216 : {
217 202 : Oid ancestor = lfirst_oid(lc2);
218 :
219 202 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
220 : {
221 80 : skip = true;
222 80 : break;
223 : }
224 : }
225 :
226 682 : if (skip)
227 80 : table_infos = foreach_delete_current(table_infos, lc);
228 : }
229 352 : }
230 :
231 : /*
232 : * Returns true if any schema is associated with the publication, false if no
233 : * schema is associated with the publication.
234 : */
235 : bool
236 284 : is_schema_publication(Oid pubid)
237 : {
238 : Relation pubschsrel;
239 : ScanKeyData scankey;
240 : SysScanDesc scan;
241 : HeapTuple tup;
242 284 : bool result = false;
243 :
244 284 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
245 284 : ScanKeyInit(&scankey,
246 : Anum_pg_publication_namespace_pnpubid,
247 : BTEqualStrategyNumber, F_OIDEQ,
248 : ObjectIdGetDatum(pubid));
249 :
250 284 : scan = systable_beginscan(pubschsrel,
251 : PublicationNamespacePnnspidPnpubidIndexId,
252 : true, NULL, 1, &scankey);
253 284 : tup = systable_getnext(scan);
254 284 : result = HeapTupleIsValid(tup);
255 :
256 284 : systable_endscan(scan);
257 284 : table_close(pubschsrel, AccessShareLock);
258 :
259 284 : return result;
260 : }
261 :
262 : /*
263 : * Returns true if the relation has column list associated with the
264 : * publication, false otherwise.
265 : *
266 : * If a column list is found, the corresponding bitmap is returned through the
267 : * cols parameter, if provided. The bitmap is constructed within the given
268 : * memory context (mcxt).
269 : */
270 : bool
271 1586 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
272 : Bitmapset **cols)
273 : {
274 : HeapTuple cftuple;
275 1586 : bool found = false;
276 :
277 1586 : if (pub->alltables)
278 386 : return false;
279 :
280 1200 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
281 : ObjectIdGetDatum(relid),
282 : ObjectIdGetDatum(pub->oid));
283 1200 : if (HeapTupleIsValid(cftuple))
284 : {
285 : Datum cfdatum;
286 : bool isnull;
287 :
288 : /* Lookup the column list attribute. */
289 1098 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
290 : Anum_pg_publication_rel_prattrs, &isnull);
291 :
292 : /* Was a column list found? */
293 1098 : if (!isnull)
294 : {
295 : /* Build the column list bitmap in the given memory context. */
296 314 : if (cols)
297 308 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
298 :
299 314 : found = true;
300 : }
301 :
302 1098 : ReleaseSysCache(cftuple);
303 : }
304 :
305 1200 : return found;
306 : }
307 :
308 : /*
309 : * Gets the relations based on the publication partition option for a specified
310 : * relation.
311 : */
312 : List *
313 5830 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
314 : Oid relid)
315 : {
316 5830 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
317 : pub_partopt != PUBLICATION_PART_ROOT)
318 1188 : {
319 1188 : List *all_parts = find_all_inheritors(relid, NoLock,
320 : NULL);
321 :
322 1188 : if (pub_partopt == PUBLICATION_PART_ALL)
323 986 : result = list_concat(result, all_parts);
324 202 : else if (pub_partopt == PUBLICATION_PART_LEAF)
325 : {
326 : ListCell *lc;
327 :
328 738 : foreach(lc, all_parts)
329 : {
330 536 : Oid partOid = lfirst_oid(lc);
331 :
332 536 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
333 334 : result = lappend_oid(result, partOid);
334 : }
335 : }
336 : else
337 : Assert(false);
338 : }
339 : else
340 4642 : result = lappend_oid(result, relid);
341 :
342 5830 : return result;
343 : }
344 :
345 : /*
346 : * Returns the relid of the topmost ancestor that is published via this
347 : * publication if any and set its ancestor level to ancestor_level,
348 : * otherwise returns InvalidOid.
349 : *
350 : * The ancestor_level value allows us to compare the results for multiple
351 : * publications, and decide which value is higher up.
352 : *
353 : * Note that the list of ancestors should be ordered such that the topmost
354 : * ancestor is at the end of the list.
355 : */
356 : Oid
357 510 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
358 : {
359 : ListCell *lc;
360 510 : Oid topmost_relid = InvalidOid;
361 510 : int level = 0;
362 :
363 : /*
364 : * Find the "topmost" ancestor that is in this publication.
365 : */
366 1036 : foreach(lc, ancestors)
367 : {
368 526 : Oid ancestor = lfirst_oid(lc);
369 526 : List *apubids = GetRelationPublications(ancestor);
370 526 : List *aschemaPubids = NIL;
371 :
372 526 : level++;
373 :
374 526 : if (list_member_oid(apubids, puboid))
375 : {
376 312 : topmost_relid = ancestor;
377 :
378 312 : if (ancestor_level)
379 86 : *ancestor_level = level;
380 : }
381 : else
382 : {
383 214 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
384 214 : if (list_member_oid(aschemaPubids, puboid))
385 : {
386 10 : topmost_relid = ancestor;
387 :
388 10 : if (ancestor_level)
389 10 : *ancestor_level = level;
390 : }
391 : }
392 :
393 526 : list_free(apubids);
394 526 : list_free(aschemaPubids);
395 : }
396 :
397 510 : return topmost_relid;
398 : }
399 :
400 : /*
401 : * attnumstoint2vector
402 : * Convert a Bitmapset of AttrNumbers into an int2vector.
403 : *
404 : * AttrNumber numbers are 0-based, i.e., not offset by
405 : * FirstLowInvalidHeapAttributeNumber.
406 : */
407 : static int2vector *
408 332 : attnumstoint2vector(Bitmapset *attrs)
409 : {
410 : int2vector *result;
411 332 : int n = bms_num_members(attrs);
412 332 : int i = -1;
413 332 : int j = 0;
414 :
415 332 : result = buildint2vector(NULL, n);
416 :
417 904 : while ((i = bms_next_member(attrs, i)) >= 0)
418 : {
419 : Assert(i <= PG_INT16_MAX);
420 :
421 572 : result->values[j++] = (int16) i;
422 : }
423 :
424 332 : return result;
425 : }
426 :
427 : /*
428 : * Insert new publication / relation mapping.
429 : */
430 : ObjectAddress
431 1150 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
432 : bool if_not_exists)
433 : {
434 : Relation rel;
435 : HeapTuple tup;
436 : Datum values[Natts_pg_publication_rel];
437 : bool nulls[Natts_pg_publication_rel];
438 1150 : Relation targetrel = pri->relation;
439 1150 : Oid relid = RelationGetRelid(targetrel);
440 : Oid pubreloid;
441 : Bitmapset *attnums;
442 1150 : Publication *pub = GetPublication(pubid);
443 : ObjectAddress myself,
444 : referenced;
445 1150 : List *relids = NIL;
446 : int i;
447 :
448 1150 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
449 :
450 : /*
451 : * Check for duplicates. Note that this does not really prevent
452 : * duplicates, it's here just to provide nicer error message in common
453 : * case. The real protection is the unique key on the catalog.
454 : */
455 1150 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
456 : ObjectIdGetDatum(pubid)))
457 : {
458 22 : table_close(rel, RowExclusiveLock);
459 :
460 22 : if (if_not_exists)
461 16 : return InvalidObjectAddress;
462 :
463 6 : ereport(ERROR,
464 : (errcode(ERRCODE_DUPLICATE_OBJECT),
465 : errmsg("relation \"%s\" is already member of publication \"%s\"",
466 : RelationGetRelationName(targetrel), pub->name)));
467 : }
468 :
469 1128 : check_publication_add_relation(targetrel);
470 :
471 : /* Validate and translate column names into a Bitmapset of attnums. */
472 1096 : attnums = pub_collist_validate(pri->relation, pri->columns);
473 :
474 : /* Form a tuple. */
475 1072 : memset(values, 0, sizeof(values));
476 1072 : memset(nulls, false, sizeof(nulls));
477 :
478 1072 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
479 : Anum_pg_publication_rel_oid);
480 1072 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
481 1072 : values[Anum_pg_publication_rel_prpubid - 1] =
482 1072 : ObjectIdGetDatum(pubid);
483 1072 : values[Anum_pg_publication_rel_prrelid - 1] =
484 1072 : ObjectIdGetDatum(relid);
485 :
486 : /* Add qualifications, if available */
487 1072 : if (pri->whereClause != NULL)
488 330 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
489 : else
490 742 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
491 :
492 : /* Add column list, if available */
493 1072 : if (pri->columns)
494 332 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
495 : else
496 740 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
497 :
498 1072 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
499 :
500 : /* Insert tuple into catalog. */
501 1072 : CatalogTupleInsert(rel, tup);
502 1072 : heap_freetuple(tup);
503 :
504 : /* Register dependencies as needed */
505 1072 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
506 :
507 : /* Add dependency on the publication */
508 1072 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
509 1072 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
510 :
511 : /* Add dependency on the relation */
512 1072 : ObjectAddressSet(referenced, RelationRelationId, relid);
513 1072 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
514 :
515 : /* Add dependency on the objects mentioned in the qualifications */
516 1072 : if (pri->whereClause)
517 330 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
518 : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
519 : false);
520 :
521 : /* Add dependency on the columns, if any are listed */
522 1072 : i = -1;
523 1644 : while ((i = bms_next_member(attnums, i)) >= 0)
524 : {
525 572 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
526 572 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
527 : }
528 :
529 : /* Close the table. */
530 1072 : table_close(rel, RowExclusiveLock);
531 :
532 : /*
533 : * Invalidate relcache so that publication info is rebuilt.
534 : *
535 : * For the partitioned tables, we must invalidate all partitions contained
536 : * in the respective partition hierarchies, not just the one explicitly
537 : * mentioned in the publication. This is required because we implicitly
538 : * publish the child tables when the parent table is published.
539 : */
540 1072 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
541 : relid);
542 :
543 1072 : InvalidatePublicationRels(relids);
544 :
545 1072 : return myself;
546 : }
547 :
548 : /*
549 : * pub_collist_validate
550 : * Process and validate the 'columns' list and ensure the columns are all
551 : * valid to use for a publication. Checks for and raises an ERROR for
552 : * any unknown columns, system columns, duplicate columns, or virtual
553 : * generated columns.
554 : *
555 : * Looks up each column's attnum and returns a 0-based Bitmapset of the
556 : * corresponding attnums.
557 : */
558 : Bitmapset *
559 1500 : pub_collist_validate(Relation targetrel, List *columns)
560 : {
561 1500 : Bitmapset *set = NULL;
562 : ListCell *lc;
563 1500 : TupleDesc tupdesc = RelationGetDescr(targetrel);
564 :
565 2344 : foreach(lc, columns)
566 : {
567 880 : char *colname = strVal(lfirst(lc));
568 880 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
569 :
570 880 : if (attnum == InvalidAttrNumber)
571 6 : ereport(ERROR,
572 : errcode(ERRCODE_UNDEFINED_COLUMN),
573 : errmsg("column \"%s\" of relation \"%s\" does not exist",
574 : colname, RelationGetRelationName(targetrel)));
575 :
576 874 : if (!AttrNumberIsForUserDefinedAttr(attnum))
577 12 : ereport(ERROR,
578 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
579 : errmsg("cannot use system column \"%s\" in publication column list",
580 : colname));
581 :
582 862 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
583 6 : ereport(ERROR,
584 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
585 : errmsg("cannot use virtual generated column \"%s\" in publication column list",
586 : colname));
587 :
588 856 : if (bms_is_member(attnum, set))
589 12 : ereport(ERROR,
590 : errcode(ERRCODE_DUPLICATE_OBJECT),
591 : errmsg("duplicate column \"%s\" in publication column list",
592 : colname));
593 :
594 844 : set = bms_add_member(set, attnum);
595 : }
596 :
597 1464 : return set;
598 : }
599 :
600 : /*
601 : * Transform a column list (represented by an array Datum) to a bitmapset.
602 : *
603 : * If columns isn't NULL, add the column numbers to that set.
604 : *
605 : * If mcxt isn't NULL, build the bitmapset in that context.
606 : */
607 : Bitmapset *
608 444 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
609 : {
610 444 : Bitmapset *result = columns;
611 : ArrayType *arr;
612 : int nelems;
613 : int16 *elems;
614 444 : MemoryContext oldcxt = NULL;
615 :
616 444 : arr = DatumGetArrayTypeP(pubcols);
617 444 : nelems = ARR_DIMS(arr)[0];
618 444 : elems = (int16 *) ARR_DATA_PTR(arr);
619 :
620 : /* If a memory context was specified, switch to it. */
621 444 : if (mcxt)
622 78 : oldcxt = MemoryContextSwitchTo(mcxt);
623 :
624 1226 : for (int i = 0; i < nelems; i++)
625 782 : result = bms_add_member(result, elems[i]);
626 :
627 444 : if (mcxt)
628 78 : MemoryContextSwitchTo(oldcxt);
629 :
630 444 : return result;
631 : }
632 :
633 : /*
634 : * Returns a bitmap representing the columns of the specified table.
635 : *
636 : * Generated columns are included if include_gencols_type is
637 : * PUBLISH_GENCOLS_STORED.
638 : */
639 : Bitmapset *
640 18 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
641 : {
642 18 : Bitmapset *result = NULL;
643 18 : TupleDesc desc = RelationGetDescr(relation);
644 :
645 60 : for (int i = 0; i < desc->natts; i++)
646 : {
647 42 : Form_pg_attribute att = TupleDescAttr(desc, i);
648 :
649 42 : if (att->attisdropped)
650 2 : continue;
651 :
652 40 : if (att->attgenerated)
653 : {
654 : /* We only support replication of STORED generated cols. */
655 4 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
656 2 : continue;
657 :
658 : /* User hasn't requested to replicate STORED generated cols. */
659 2 : if (include_gencols_type != PUBLISH_GENCOLS_STORED)
660 2 : continue;
661 : }
662 :
663 36 : result = bms_add_member(result, att->attnum);
664 : }
665 :
666 18 : return result;
667 : }
668 :
669 : /*
670 : * Insert new publication / schema mapping.
671 : */
672 : ObjectAddress
673 262 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
674 : {
675 : Relation rel;
676 : HeapTuple tup;
677 : Datum values[Natts_pg_publication_namespace];
678 : bool nulls[Natts_pg_publication_namespace];
679 : Oid psschid;
680 262 : Publication *pub = GetPublication(pubid);
681 262 : List *schemaRels = NIL;
682 : ObjectAddress myself,
683 : referenced;
684 :
685 262 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
686 :
687 : /*
688 : * Check for duplicates. Note that this does not really prevent
689 : * duplicates, it's here just to provide nicer error message in common
690 : * case. The real protection is the unique key on the catalog.
691 : */
692 262 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
693 : ObjectIdGetDatum(schemaid),
694 : ObjectIdGetDatum(pubid)))
695 : {
696 18 : table_close(rel, RowExclusiveLock);
697 :
698 18 : if (if_not_exists)
699 12 : return InvalidObjectAddress;
700 :
701 6 : ereport(ERROR,
702 : (errcode(ERRCODE_DUPLICATE_OBJECT),
703 : errmsg("schema \"%s\" is already member of publication \"%s\"",
704 : get_namespace_name(schemaid), pub->name)));
705 : }
706 :
707 244 : check_publication_add_schema(schemaid);
708 :
709 : /* Form a tuple */
710 238 : memset(values, 0, sizeof(values));
711 238 : memset(nulls, false, sizeof(nulls));
712 :
713 238 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
714 : Anum_pg_publication_namespace_oid);
715 238 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
716 238 : values[Anum_pg_publication_namespace_pnpubid - 1] =
717 238 : ObjectIdGetDatum(pubid);
718 238 : values[Anum_pg_publication_namespace_pnnspid - 1] =
719 238 : ObjectIdGetDatum(schemaid);
720 :
721 238 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
722 :
723 : /* Insert tuple into catalog */
724 238 : CatalogTupleInsert(rel, tup);
725 238 : heap_freetuple(tup);
726 :
727 238 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
728 :
729 : /* Add dependency on the publication */
730 238 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
731 238 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
732 :
733 : /* Add dependency on the schema */
734 238 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
735 238 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
736 :
737 : /* Close the table */
738 238 : table_close(rel, RowExclusiveLock);
739 :
740 : /*
741 : * Invalidate relcache so that publication info is rebuilt. See
742 : * publication_add_relation for why we need to consider all the
743 : * partitions.
744 : */
745 238 : schemaRels = GetSchemaPublicationRelations(schemaid,
746 : PUBLICATION_PART_ALL);
747 238 : InvalidatePublicationRels(schemaRels);
748 :
749 238 : return myself;
750 : }
751 :
752 : /* Gets list of publication oids for a relation */
753 : List *
754 13386 : GetRelationPublications(Oid relid)
755 : {
756 13386 : List *result = NIL;
757 : CatCList *pubrellist;
758 : int i;
759 :
760 : /* Find all publications associated with the relation. */
761 13386 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
762 : ObjectIdGetDatum(relid));
763 15038 : for (i = 0; i < pubrellist->n_members; i++)
764 : {
765 1652 : HeapTuple tup = &pubrellist->members[i]->tuple;
766 1652 : Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
767 :
768 1652 : result = lappend_oid(result, pubid);
769 : }
770 :
771 13386 : ReleaseSysCacheList(pubrellist);
772 :
773 13386 : return result;
774 : }
775 :
776 : /*
777 : * Gets list of relation oids for a publication.
778 : *
779 : * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
780 : * should use GetAllPublicationRelations().
781 : */
782 : List *
783 2354 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
784 : {
785 : List *result;
786 : Relation pubrelsrel;
787 : ScanKeyData scankey;
788 : SysScanDesc scan;
789 : HeapTuple tup;
790 :
791 : /* Find all publications associated with the relation. */
792 2354 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
793 :
794 2354 : ScanKeyInit(&scankey,
795 : Anum_pg_publication_rel_prpubid,
796 : BTEqualStrategyNumber, F_OIDEQ,
797 : ObjectIdGetDatum(pubid));
798 :
799 2354 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
800 : true, NULL, 1, &scankey);
801 :
802 2354 : result = NIL;
803 5448 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
804 : {
805 : Form_pg_publication_rel pubrel;
806 :
807 3094 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
808 3094 : result = GetPubPartitionOptionRelations(result, pub_partopt,
809 : pubrel->prrelid);
810 : }
811 :
812 2354 : systable_endscan(scan);
813 2354 : table_close(pubrelsrel, AccessShareLock);
814 :
815 : /* Now sort and de-duplicate the result list */
816 2354 : list_sort(result, list_oid_cmp);
817 2354 : list_deduplicate_oid(result);
818 :
819 2354 : return result;
820 : }
821 :
822 : /*
823 : * Gets list of publication oids for publications marked as FOR ALL TABLES.
824 : */
825 : List *
826 9064 : GetAllTablesPublications(void)
827 : {
828 : List *result;
829 : Relation rel;
830 : ScanKeyData scankey;
831 : SysScanDesc scan;
832 : HeapTuple tup;
833 :
834 : /* Find all publications that are marked as for all tables. */
835 9064 : rel = table_open(PublicationRelationId, AccessShareLock);
836 :
837 9064 : ScanKeyInit(&scankey,
838 : Anum_pg_publication_puballtables,
839 : BTEqualStrategyNumber, F_BOOLEQ,
840 : BoolGetDatum(true));
841 :
842 9064 : scan = systable_beginscan(rel, InvalidOid, false,
843 : NULL, 1, &scankey);
844 :
845 9064 : result = NIL;
846 9280 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
847 : {
848 216 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
849 :
850 216 : result = lappend_oid(result, oid);
851 : }
852 :
853 9064 : systable_endscan(scan);
854 9064 : table_close(rel, AccessShareLock);
855 :
856 9064 : return result;
857 : }
858 :
859 : /*
860 : * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
861 : * publication(s).
862 : *
863 : * If the publication publishes partition changes via their respective root
864 : * partitioned tables, we must exclude partitions in favor of including the
865 : * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
866 : * publication.
867 : */
868 : List *
869 344 : GetAllPublicationRelations(char relkind, bool pubviaroot)
870 : {
871 : Relation classRel;
872 : ScanKeyData key[1];
873 : TableScanDesc scan;
874 : HeapTuple tuple;
875 344 : List *result = NIL;
876 :
877 : Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
878 :
879 344 : classRel = table_open(RelationRelationId, AccessShareLock);
880 :
881 344 : ScanKeyInit(&key[0],
882 : Anum_pg_class_relkind,
883 : BTEqualStrategyNumber, F_CHAREQ,
884 : CharGetDatum(relkind));
885 :
886 344 : scan = table_beginscan_catalog(classRel, 1, key);
887 :
888 25338 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
889 : {
890 24994 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
891 24994 : Oid relid = relForm->oid;
892 :
893 24994 : if (is_publishable_class(relid, relForm) &&
894 1590 : !(relForm->relispartition && pubviaroot))
895 1408 : result = lappend_oid(result, relid);
896 : }
897 :
898 344 : table_endscan(scan);
899 :
900 344 : if (pubviaroot)
901 : {
902 26 : ScanKeyInit(&key[0],
903 : Anum_pg_class_relkind,
904 : BTEqualStrategyNumber, F_CHAREQ,
905 : CharGetDatum(RELKIND_PARTITIONED_TABLE));
906 :
907 26 : scan = table_beginscan_catalog(classRel, 1, key);
908 :
909 156 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
910 : {
911 130 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
912 130 : Oid relid = relForm->oid;
913 :
914 130 : if (is_publishable_class(relid, relForm) &&
915 130 : !relForm->relispartition)
916 104 : result = lappend_oid(result, relid);
917 : }
918 :
919 26 : table_endscan(scan);
920 : }
921 :
922 344 : table_close(classRel, AccessShareLock);
923 344 : return result;
924 : }
925 :
926 : /*
927 : * Gets the list of schema oids for a publication.
928 : *
929 : * This should only be used FOR TABLES IN SCHEMA publications.
930 : */
931 : List *
932 2262 : GetPublicationSchemas(Oid pubid)
933 : {
934 2262 : List *result = NIL;
935 : Relation pubschsrel;
936 : ScanKeyData scankey;
937 : SysScanDesc scan;
938 : HeapTuple tup;
939 :
940 : /* Find all schemas associated with the publication */
941 2262 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
942 :
943 2262 : ScanKeyInit(&scankey,
944 : Anum_pg_publication_namespace_pnpubid,
945 : BTEqualStrategyNumber, F_OIDEQ,
946 : ObjectIdGetDatum(pubid));
947 :
948 2262 : scan = systable_beginscan(pubschsrel,
949 : PublicationNamespacePnnspidPnpubidIndexId,
950 : true, NULL, 1, &scankey);
951 2362 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
952 : {
953 : Form_pg_publication_namespace pubsch;
954 :
955 100 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
956 :
957 100 : result = lappend_oid(result, pubsch->pnnspid);
958 : }
959 :
960 2262 : systable_endscan(scan);
961 2262 : table_close(pubschsrel, AccessShareLock);
962 :
963 2262 : return result;
964 : }
965 :
966 : /*
967 : * Gets the list of publication oids associated with a specified schema.
968 : */
969 : List *
970 12928 : GetSchemaPublications(Oid schemaid)
971 : {
972 12928 : List *result = NIL;
973 : CatCList *pubschlist;
974 : int i;
975 :
976 : /* Find all publications associated with the schema */
977 12928 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
978 : ObjectIdGetDatum(schemaid));
979 13042 : for (i = 0; i < pubschlist->n_members; i++)
980 : {
981 114 : HeapTuple tup = &pubschlist->members[i]->tuple;
982 114 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
983 :
984 114 : result = lappend_oid(result, pubid);
985 : }
986 :
987 12928 : ReleaseSysCacheList(pubschlist);
988 :
989 12928 : return result;
990 : }
991 :
992 : /*
993 : * Get the list of publishable relation oids for a specified schema.
994 : */
995 : List *
996 500 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
997 : {
998 : Relation classRel;
999 : ScanKeyData key[1];
1000 : TableScanDesc scan;
1001 : HeapTuple tuple;
1002 500 : List *result = NIL;
1003 :
1004 : Assert(OidIsValid(schemaid));
1005 :
1006 500 : classRel = table_open(RelationRelationId, AccessShareLock);
1007 :
1008 500 : ScanKeyInit(&key[0],
1009 : Anum_pg_class_relnamespace,
1010 : BTEqualStrategyNumber, F_OIDEQ,
1011 : ObjectIdGetDatum(schemaid));
1012 :
1013 : /* get all the relations present in the specified schema */
1014 500 : scan = table_beginscan_catalog(classRel, 1, key);
1015 25732 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1016 : {
1017 25232 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1018 25232 : Oid relid = relForm->oid;
1019 : char relkind;
1020 :
1021 25232 : if (!is_publishable_class(relid, relForm))
1022 8656 : continue;
1023 :
1024 16576 : relkind = get_rel_relkind(relid);
1025 16576 : if (relkind == RELKIND_RELATION)
1026 14206 : result = lappend_oid(result, relid);
1027 2370 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1028 : {
1029 782 : List *partitionrels = NIL;
1030 :
1031 : /*
1032 : * It is quite possible that some of the partitions are in a
1033 : * different schema than the parent table, so we need to get such
1034 : * partitions separately.
1035 : */
1036 782 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1037 : pub_partopt,
1038 : relForm->oid);
1039 782 : result = list_concat_unique_oid(result, partitionrels);
1040 : }
1041 : }
1042 :
1043 500 : table_endscan(scan);
1044 500 : table_close(classRel, AccessShareLock);
1045 500 : return result;
1046 : }
1047 :
1048 : /*
1049 : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1050 : * publication.
1051 : */
1052 : List *
1053 1858 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1054 : {
1055 1858 : List *result = NIL;
1056 1858 : List *pubschemalist = GetPublicationSchemas(pubid);
1057 : ListCell *cell;
1058 :
1059 1928 : foreach(cell, pubschemalist)
1060 : {
1061 70 : Oid schemaid = lfirst_oid(cell);
1062 70 : List *schemaRels = NIL;
1063 :
1064 70 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1065 70 : result = list_concat(result, schemaRels);
1066 : }
1067 :
1068 1858 : return result;
1069 : }
1070 :
1071 : /*
1072 : * Get publication using oid
1073 : *
1074 : * The Publication struct and its data are palloc'ed here.
1075 : */
1076 : Publication *
1077 9082 : GetPublication(Oid pubid)
1078 : {
1079 : HeapTuple tup;
1080 : Publication *pub;
1081 : Form_pg_publication pubform;
1082 :
1083 9082 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1084 9082 : if (!HeapTupleIsValid(tup))
1085 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1086 :
1087 9082 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1088 :
1089 9082 : pub = (Publication *) palloc(sizeof(Publication));
1090 9082 : pub->oid = pubid;
1091 9082 : pub->name = pstrdup(NameStr(pubform->pubname));
1092 9082 : pub->alltables = pubform->puballtables;
1093 9082 : pub->allsequences = pubform->puballsequences;
1094 9082 : pub->pubactions.pubinsert = pubform->pubinsert;
1095 9082 : pub->pubactions.pubupdate = pubform->pubupdate;
1096 9082 : pub->pubactions.pubdelete = pubform->pubdelete;
1097 9082 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1098 9082 : pub->pubviaroot = pubform->pubviaroot;
1099 9082 : pub->pubgencols_type = pubform->pubgencols;
1100 :
1101 9082 : ReleaseSysCache(tup);
1102 :
1103 9082 : return pub;
1104 : }
1105 :
1106 : /*
1107 : * Get Publication using name.
1108 : */
1109 : Publication *
1110 2586 : GetPublicationByName(const char *pubname, bool missing_ok)
1111 : {
1112 : Oid oid;
1113 :
1114 2586 : oid = get_publication_oid(pubname, missing_ok);
1115 :
1116 2586 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1117 : }
1118 :
1119 : /*
1120 : * Get information of the tables in the given publication array.
1121 : *
1122 : * Returns pubid, relid, column list, row filter for each table.
1123 : */
1124 : Datum
1125 6132 : pg_get_publication_tables(PG_FUNCTION_ARGS)
1126 : {
1127 : #define NUM_PUBLICATION_TABLES_ELEM 4
1128 : FuncCallContext *funcctx;
1129 6132 : List *table_infos = NIL;
1130 :
1131 : /* stuff done only on the first call of the function */
1132 6132 : if (SRF_IS_FIRSTCALL())
1133 : {
1134 : TupleDesc tupdesc;
1135 : MemoryContext oldcontext;
1136 : ArrayType *arr;
1137 : Datum *elems;
1138 : int nelems,
1139 : i;
1140 1986 : bool viaroot = false;
1141 :
1142 : /* create a function context for cross-call persistence */
1143 1986 : funcctx = SRF_FIRSTCALL_INIT();
1144 :
1145 : /* switch to memory context appropriate for multiple function calls */
1146 1986 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1147 :
1148 : /*
1149 : * Deconstruct the parameter into elements where each element is a
1150 : * publication name.
1151 : */
1152 1986 : arr = PG_GETARG_ARRAYTYPE_P(0);
1153 1986 : deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
1154 :
1155 : /* Get Oids of tables from each publication. */
1156 4062 : for (i = 0; i < nelems; i++)
1157 : {
1158 : Publication *pub_elem;
1159 2076 : List *pub_elem_tables = NIL;
1160 : ListCell *lc;
1161 :
1162 2076 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1163 :
1164 : /*
1165 : * Publications support partitioned tables. If
1166 : * publish_via_partition_root is false, all changes are replicated
1167 : * using leaf partition identity and schema, so we only need
1168 : * those. Otherwise, get the partitioned table itself.
1169 : */
1170 2076 : if (pub_elem->alltables)
1171 344 : pub_elem_tables = GetAllPublicationRelations(RELKIND_RELATION,
1172 344 : pub_elem->pubviaroot);
1173 : else
1174 : {
1175 : List *relids,
1176 : *schemarelids;
1177 :
1178 1732 : relids = GetPublicationRelations(pub_elem->oid,
1179 1732 : pub_elem->pubviaroot ?
1180 1732 : PUBLICATION_PART_ROOT :
1181 : PUBLICATION_PART_LEAF);
1182 1732 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1183 1732 : pub_elem->pubviaroot ?
1184 1732 : PUBLICATION_PART_ROOT :
1185 : PUBLICATION_PART_LEAF);
1186 1732 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1187 : }
1188 :
1189 : /*
1190 : * Record the published table and the corresponding publication so
1191 : * that we can get row filters and column lists later.
1192 : *
1193 : * When a table is published by multiple publications, to obtain
1194 : * all row filters and column lists, the structure related to this
1195 : * table will be recorded multiple times.
1196 : */
1197 6302 : foreach(lc, pub_elem_tables)
1198 : {
1199 4226 : published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
1200 :
1201 4226 : table_info->relid = lfirst_oid(lc);
1202 4226 : table_info->pubid = pub_elem->oid;
1203 4226 : table_infos = lappend(table_infos, table_info);
1204 : }
1205 :
1206 : /* At least one publication is using publish_via_partition_root. */
1207 2076 : if (pub_elem->pubviaroot)
1208 368 : viaroot = true;
1209 : }
1210 :
1211 : /*
1212 : * If the publication publishes partition changes via their respective
1213 : * root partitioned tables, we must exclude partitions in favor of
1214 : * including the root partitioned tables. Otherwise, the function
1215 : * could return both the child and parent tables which could cause
1216 : * data of the child table to be double-published on the subscriber
1217 : * side.
1218 : */
1219 1986 : if (viaroot)
1220 352 : filter_partitions(table_infos);
1221 :
1222 : /* Construct a tuple descriptor for the result rows. */
1223 1986 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1224 1986 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1225 : OIDOID, -1, 0);
1226 1986 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1227 : OIDOID, -1, 0);
1228 1986 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1229 : INT2VECTOROID, -1, 0);
1230 1986 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1231 : PG_NODE_TREEOID, -1, 0);
1232 :
1233 1986 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1234 1986 : funcctx->user_fctx = table_infos;
1235 :
1236 1986 : MemoryContextSwitchTo(oldcontext);
1237 : }
1238 :
1239 : /* stuff done on every call of the function */
1240 6132 : funcctx = SRF_PERCALL_SETUP();
1241 6132 : table_infos = (List *) funcctx->user_fctx;
1242 :
1243 6132 : if (funcctx->call_cntr < list_length(table_infos))
1244 : {
1245 4146 : HeapTuple pubtuple = NULL;
1246 : HeapTuple rettuple;
1247 : Publication *pub;
1248 4146 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1249 4146 : Oid relid = table_info->relid;
1250 4146 : Oid schemaid = get_rel_namespace(relid);
1251 4146 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1252 4146 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1253 :
1254 : /*
1255 : * Form tuple with appropriate data.
1256 : */
1257 :
1258 4146 : pub = GetPublication(table_info->pubid);
1259 :
1260 4146 : values[0] = ObjectIdGetDatum(pub->oid);
1261 4146 : values[1] = ObjectIdGetDatum(relid);
1262 :
1263 : /*
1264 : * We don't consider row filters or column lists for FOR ALL TABLES or
1265 : * FOR TABLES IN SCHEMA publications.
1266 : */
1267 4146 : if (!pub->alltables &&
1268 2634 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1269 : ObjectIdGetDatum(schemaid),
1270 : ObjectIdGetDatum(pub->oid)))
1271 2540 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1272 : ObjectIdGetDatum(relid),
1273 : ObjectIdGetDatum(pub->oid));
1274 :
1275 4146 : if (HeapTupleIsValid(pubtuple))
1276 : {
1277 : /* Lookup the column list attribute. */
1278 2314 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1279 : Anum_pg_publication_rel_prattrs,
1280 : &(nulls[2]));
1281 :
1282 : /* Null indicates no filter. */
1283 2314 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1284 : Anum_pg_publication_rel_prqual,
1285 : &(nulls[3]));
1286 : }
1287 : else
1288 : {
1289 1832 : nulls[2] = true;
1290 1832 : nulls[3] = true;
1291 : }
1292 :
1293 : /* Show all columns when the column list is not specified. */
1294 4146 : if (nulls[2])
1295 : {
1296 3894 : Relation rel = table_open(relid, AccessShareLock);
1297 3894 : int nattnums = 0;
1298 : int16 *attnums;
1299 3894 : TupleDesc desc = RelationGetDescr(rel);
1300 : int i;
1301 :
1302 3894 : attnums = (int16 *) palloc(desc->natts * sizeof(int16));
1303 :
1304 10698 : for (i = 0; i < desc->natts; i++)
1305 : {
1306 6804 : Form_pg_attribute att = TupleDescAttr(desc, i);
1307 :
1308 6804 : if (att->attisdropped)
1309 12 : continue;
1310 :
1311 6792 : if (att->attgenerated)
1312 : {
1313 : /* We only support replication of STORED generated cols. */
1314 96 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1315 72 : continue;
1316 :
1317 : /*
1318 : * User hasn't requested to replicate STORED generated
1319 : * cols.
1320 : */
1321 24 : if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
1322 18 : continue;
1323 : }
1324 :
1325 6702 : attnums[nattnums++] = att->attnum;
1326 : }
1327 :
1328 3894 : if (nattnums > 0)
1329 : {
1330 3850 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1331 3850 : nulls[2] = false;
1332 : }
1333 :
1334 3894 : table_close(rel, AccessShareLock);
1335 : }
1336 :
1337 4146 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1338 :
1339 4146 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1340 : }
1341 :
1342 1986 : SRF_RETURN_DONE(funcctx);
1343 : }
1344 :
1345 : /*
1346 : * Returns Oids of sequences in a publication.
1347 : */
1348 : Datum
1349 0 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
1350 : {
1351 : FuncCallContext *funcctx;
1352 0 : List *sequences = NIL;
1353 :
1354 : /* stuff done only on the first call of the function */
1355 0 : if (SRF_IS_FIRSTCALL())
1356 : {
1357 0 : char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1358 : Publication *publication;
1359 : MemoryContext oldcontext;
1360 :
1361 : /* create a function context for cross-call persistence */
1362 0 : funcctx = SRF_FIRSTCALL_INIT();
1363 :
1364 : /* switch to memory context appropriate for multiple function calls */
1365 0 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1366 :
1367 0 : publication = GetPublicationByName(pubname, false);
1368 :
1369 0 : if (publication->allsequences)
1370 0 : sequences = GetAllPublicationRelations(RELKIND_SEQUENCE, false);
1371 :
1372 0 : funcctx->user_fctx = (void *) sequences;
1373 :
1374 0 : MemoryContextSwitchTo(oldcontext);
1375 : }
1376 :
1377 : /* stuff done on every call of the function */
1378 0 : funcctx = SRF_PERCALL_SETUP();
1379 0 : sequences = (List *) funcctx->user_fctx;
1380 :
1381 0 : if (funcctx->call_cntr < list_length(sequences))
1382 : {
1383 0 : Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1384 :
1385 0 : SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
1386 : }
1387 :
1388 0 : SRF_RETURN_DONE(funcctx);
1389 : }
|