Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_publication.c
4 : * publication C API manipulation
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_publication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/catalog.h"
22 : #include "catalog/dependency.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/partition.h"
27 : #include "catalog/pg_inherits.h"
28 : #include "catalog/pg_namespace.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/publicationcmds.h"
34 : #include "funcapi.h"
35 : #include "utils/array.h"
36 : #include "utils/builtins.h"
37 : #include "utils/catcache.h"
38 : #include "utils/fmgroids.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/syscache.h"
42 :
43 : /* Records association between publication and published table */
44 : typedef struct
45 : {
46 : Oid relid; /* OID of published table */
47 : Oid pubid; /* OID of publication that publishes this
48 : * table. */
49 : } published_rel;
50 :
51 : /*
52 : * Check if relation can be in given publication and throws appropriate
53 : * error if not.
54 : */
55 : static void
56 1070 : check_publication_add_relation(Relation targetrel)
57 : {
58 : /* Must be a regular or partitioned table */
59 1070 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
60 144 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
61 14 : ereport(ERROR,
62 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
63 : errmsg("cannot add relation \"%s\" to publication",
64 : RelationGetRelationName(targetrel)),
65 : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
66 :
67 : /* Can't be system table */
68 1056 : if (IsCatalogRelation(targetrel))
69 6 : ereport(ERROR,
70 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
71 : errmsg("cannot add relation \"%s\" to publication",
72 : RelationGetRelationName(targetrel)),
73 : errdetail("This operation is not supported for system tables.")));
74 :
75 : /* UNLOGGED and TEMP relations cannot be part of publication. */
76 1050 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
77 6 : ereport(ERROR,
78 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
79 : errmsg("cannot add relation \"%s\" to publication",
80 : RelationGetRelationName(targetrel)),
81 : errdetail("This operation is not supported for temporary tables.")));
82 1044 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
83 6 : ereport(ERROR,
84 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : errmsg("cannot add relation \"%s\" to publication",
86 : RelationGetRelationName(targetrel)),
87 : errdetail("This operation is not supported for unlogged tables.")));
88 1038 : }
89 :
90 : /*
91 : * Check if schema can be in given publication and throw appropriate error if
92 : * not.
93 : */
94 : static void
95 204 : check_publication_add_schema(Oid schemaid)
96 : {
97 : /* Can't be system namespace */
98 204 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
99 6 : ereport(ERROR,
100 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
101 : errmsg("cannot add schema \"%s\" to publication",
102 : get_namespace_name(schemaid)),
103 : errdetail("This operation is not supported for system schemas.")));
104 :
105 : /* Can't be temporary namespace */
106 198 : if (isAnyTempNamespace(schemaid))
107 0 : ereport(ERROR,
108 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
109 : errmsg("cannot add schema \"%s\" to publication",
110 : get_namespace_name(schemaid)),
111 : errdetail("Temporary schemas cannot be replicated.")));
112 198 : }
113 :
114 : /*
115 : * Returns if relation represented by oid and Form_pg_class entry
116 : * is publishable.
117 : *
118 : * Does same checks as check_publication_add_relation() above, but does not
119 : * need relation to be opened and also does not throw errors.
120 : *
121 : * XXX This also excludes all tables with relid < FirstNormalObjectId,
122 : * ie all tables created during initdb. This mainly affects the preinstalled
123 : * information_schema. IsCatalogRelationOid() only excludes tables with
124 : * relid < FirstUnpinnedObjectId, making that test rather redundant,
125 : * but really we should get rid of the FirstNormalObjectId test not
126 : * IsCatalogRelationOid. We can't do so today because we don't want
127 : * information_schema tables to be considered publishable; but this test
128 : * is really inadequate for that, since the information_schema could be
129 : * dropped and reloaded and then it'll be considered publishable. The best
130 : * long-term solution may be to add a "relispublishable" bool to pg_class,
131 : * and depend on that instead of OID checks.
132 : */
133 : static bool
134 593930 : is_publishable_class(Oid relid, Form_pg_class reltuple)
135 : {
136 605942 : return (reltuple->relkind == RELKIND_RELATION ||
137 12012 : reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
138 583350 : !IsCatalogRelationOid(relid) &&
139 1187860 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
140 : relid >= FirstNormalObjectId;
141 : }
142 :
143 : /*
144 : * Another variant of is_publishable_class(), taking a Relation.
145 : */
146 : bool
147 539148 : is_publishable_relation(Relation rel)
148 : {
149 539148 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
150 : }
151 :
152 : /*
153 : * SQL-callable variant of the above
154 : *
155 : * This returns null when the relation does not exist. This is intended to be
156 : * used for example in psql to avoid gratuitous errors when there are
157 : * concurrent catalog changes.
158 : */
159 : Datum
160 5960 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
161 : {
162 5960 : Oid relid = PG_GETARG_OID(0);
163 : HeapTuple tuple;
164 : bool result;
165 :
166 5960 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
167 5960 : if (!HeapTupleIsValid(tuple))
168 0 : PG_RETURN_NULL();
169 5960 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
170 5960 : ReleaseSysCache(tuple);
171 5960 : PG_RETURN_BOOL(result);
172 : }
173 :
174 : /*
175 : * Returns true if the ancestor is in the list of published relations.
176 : * Otherwise, returns false.
177 : */
178 : static bool
179 202 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
180 : {
181 : ListCell *lc;
182 :
183 690 : foreach(lc, table_infos)
184 : {
185 568 : Oid relid = ((published_rel *) lfirst(lc))->relid;
186 :
187 568 : if (relid == ancestor)
188 80 : return true;
189 : }
190 :
191 122 : return false;
192 : }
193 :
194 : /*
195 : * Filter out the partitions whose parent tables are also present in the list.
196 : */
197 : static void
198 352 : filter_partitions(List *table_infos)
199 : {
200 : ListCell *lc;
201 :
202 1034 : foreach(lc, table_infos)
203 : {
204 682 : bool skip = false;
205 682 : List *ancestors = NIL;
206 : ListCell *lc2;
207 682 : published_rel *table_info = (published_rel *) lfirst(lc);
208 :
209 682 : if (get_rel_relispartition(table_info->relid))
210 202 : ancestors = get_partition_ancestors(table_info->relid);
211 :
212 804 : foreach(lc2, ancestors)
213 : {
214 202 : Oid ancestor = lfirst_oid(lc2);
215 :
216 202 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
217 : {
218 80 : skip = true;
219 80 : break;
220 : }
221 : }
222 :
223 682 : if (skip)
224 80 : table_infos = foreach_delete_current(table_infos, lc);
225 : }
226 352 : }
227 :
228 : /*
229 : * Returns true if any schema is associated with the publication, false if no
230 : * schema is associated with the publication.
231 : */
232 : bool
233 268 : is_schema_publication(Oid pubid)
234 : {
235 : Relation pubschsrel;
236 : ScanKeyData scankey;
237 : SysScanDesc scan;
238 : HeapTuple tup;
239 268 : bool result = false;
240 :
241 268 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
242 268 : ScanKeyInit(&scankey,
243 : Anum_pg_publication_namespace_pnpubid,
244 : BTEqualStrategyNumber, F_OIDEQ,
245 : ObjectIdGetDatum(pubid));
246 :
247 268 : scan = systable_beginscan(pubschsrel,
248 : PublicationNamespacePnnspidPnpubidIndexId,
249 : true, NULL, 1, &scankey);
250 268 : tup = systable_getnext(scan);
251 268 : result = HeapTupleIsValid(tup);
252 :
253 268 : systable_endscan(scan);
254 268 : table_close(pubschsrel, AccessShareLock);
255 :
256 268 : return result;
257 : }
258 :
259 : /*
260 : * Returns true if the relation has column list associated with the
261 : * publication, false otherwise.
262 : *
263 : * If a column list is found, the corresponding bitmap is returned through the
264 : * cols parameter, if provided. The bitmap is constructed within the given
265 : * memory context (mcxt).
266 : */
267 : bool
268 1482 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
269 : Bitmapset **cols)
270 : {
271 : HeapTuple cftuple;
272 1482 : bool found = false;
273 :
274 1482 : if (pub->alltables)
275 354 : return false;
276 :
277 1128 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
278 : ObjectIdGetDatum(relid),
279 : ObjectIdGetDatum(pub->oid));
280 1128 : if (HeapTupleIsValid(cftuple))
281 : {
282 : Datum cfdatum;
283 : bool isnull;
284 :
285 : /* Lookup the column list attribute. */
286 1026 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
287 : Anum_pg_publication_rel_prattrs, &isnull);
288 :
289 : /* Was a column list found? */
290 1026 : if (!isnull)
291 : {
292 : /* Build the column list bitmap in the given memory context. */
293 314 : if (cols)
294 308 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
295 :
296 314 : found = true;
297 : }
298 :
299 1026 : ReleaseSysCache(cftuple);
300 : }
301 :
302 1128 : return found;
303 : }
304 :
305 : /*
306 : * Gets the relations based on the publication partition option for a specified
307 : * relation.
308 : */
309 : List *
310 5646 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
311 : Oid relid)
312 : {
313 5646 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
314 : pub_partopt != PUBLICATION_PART_ROOT)
315 1140 : {
316 1140 : List *all_parts = find_all_inheritors(relid, NoLock,
317 : NULL);
318 :
319 1140 : if (pub_partopt == PUBLICATION_PART_ALL)
320 938 : result = list_concat(result, all_parts);
321 202 : else if (pub_partopt == PUBLICATION_PART_LEAF)
322 : {
323 : ListCell *lc;
324 :
325 738 : foreach(lc, all_parts)
326 : {
327 536 : Oid partOid = lfirst_oid(lc);
328 :
329 536 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
330 334 : result = lappend_oid(result, partOid);
331 : }
332 : }
333 : else
334 : Assert(false);
335 : }
336 : else
337 4506 : result = lappend_oid(result, relid);
338 :
339 5646 : return result;
340 : }
341 :
342 : /*
343 : * Returns the relid of the topmost ancestor that is published via this
344 : * publication if any and set its ancestor level to ancestor_level,
345 : * otherwise returns InvalidOid.
346 : *
347 : * The ancestor_level value allows us to compare the results for multiple
348 : * publications, and decide which value is higher up.
349 : *
350 : * Note that the list of ancestors should be ordered such that the topmost
351 : * ancestor is at the end of the list.
352 : */
353 : Oid
354 510 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
355 : {
356 : ListCell *lc;
357 510 : Oid topmost_relid = InvalidOid;
358 510 : int level = 0;
359 :
360 : /*
361 : * Find the "topmost" ancestor that is in this publication.
362 : */
363 1036 : foreach(lc, ancestors)
364 : {
365 526 : Oid ancestor = lfirst_oid(lc);
366 526 : List *apubids = GetRelationPublications(ancestor);
367 526 : List *aschemaPubids = NIL;
368 :
369 526 : level++;
370 :
371 526 : if (list_member_oid(apubids, puboid))
372 : {
373 312 : topmost_relid = ancestor;
374 :
375 312 : if (ancestor_level)
376 86 : *ancestor_level = level;
377 : }
378 : else
379 : {
380 214 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
381 214 : if (list_member_oid(aschemaPubids, puboid))
382 : {
383 10 : topmost_relid = ancestor;
384 :
385 10 : if (ancestor_level)
386 10 : *ancestor_level = level;
387 : }
388 : }
389 :
390 526 : list_free(apubids);
391 526 : list_free(aschemaPubids);
392 : }
393 :
394 510 : return topmost_relid;
395 : }
396 :
397 : /*
398 : * attnumstoint2vector
399 : * Convert a Bitmapset of AttrNumbers into an int2vector.
400 : *
401 : * AttrNumber numbers are 0-based, i.e., not offset by
402 : * FirstLowInvalidHeapAttributeNumber.
403 : */
404 : static int2vector *
405 308 : attnumstoint2vector(Bitmapset *attrs)
406 : {
407 : int2vector *result;
408 308 : int n = bms_num_members(attrs);
409 308 : int i = -1;
410 308 : int j = 0;
411 :
412 308 : result = buildint2vector(NULL, n);
413 :
414 848 : while ((i = bms_next_member(attrs, i)) >= 0)
415 : {
416 : Assert(i <= PG_INT16_MAX);
417 :
418 540 : result->values[j++] = (int16) i;
419 : }
420 :
421 308 : return result;
422 : }
423 :
424 : /*
425 : * Insert new publication / relation mapping.
426 : */
427 : ObjectAddress
428 1092 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
429 : bool if_not_exists)
430 : {
431 : Relation rel;
432 : HeapTuple tup;
433 : Datum values[Natts_pg_publication_rel];
434 : bool nulls[Natts_pg_publication_rel];
435 1092 : Relation targetrel = pri->relation;
436 1092 : Oid relid = RelationGetRelid(targetrel);
437 : Oid pubreloid;
438 : Bitmapset *attnums;
439 1092 : Publication *pub = GetPublication(pubid);
440 : ObjectAddress myself,
441 : referenced;
442 1092 : List *relids = NIL;
443 : int i;
444 :
445 1092 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
446 :
447 : /*
448 : * Check for duplicates. Note that this does not really prevent
449 : * duplicates, it's here just to provide nicer error message in common
450 : * case. The real protection is the unique key on the catalog.
451 : */
452 1092 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
453 : ObjectIdGetDatum(pubid)))
454 : {
455 22 : table_close(rel, RowExclusiveLock);
456 :
457 22 : if (if_not_exists)
458 16 : return InvalidObjectAddress;
459 :
460 6 : ereport(ERROR,
461 : (errcode(ERRCODE_DUPLICATE_OBJECT),
462 : errmsg("relation \"%s\" is already member of publication \"%s\"",
463 : RelationGetRelationName(targetrel), pub->name)));
464 : }
465 :
466 1070 : check_publication_add_relation(targetrel);
467 :
468 : /* Validate and translate column names into a Bitmapset of attnums. */
469 1038 : attnums = pub_collist_validate(pri->relation, pri->columns);
470 :
471 : /* Form a tuple. */
472 1014 : memset(values, 0, sizeof(values));
473 1014 : memset(nulls, false, sizeof(nulls));
474 :
475 1014 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
476 : Anum_pg_publication_rel_oid);
477 1014 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
478 1014 : values[Anum_pg_publication_rel_prpubid - 1] =
479 1014 : ObjectIdGetDatum(pubid);
480 1014 : values[Anum_pg_publication_rel_prrelid - 1] =
481 1014 : ObjectIdGetDatum(relid);
482 :
483 : /* Add qualifications, if available */
484 1014 : if (pri->whereClause != NULL)
485 306 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
486 : else
487 708 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
488 :
489 : /* Add column list, if available */
490 1014 : if (pri->columns)
491 308 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
492 : else
493 706 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
494 :
495 1014 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
496 :
497 : /* Insert tuple into catalog. */
498 1014 : CatalogTupleInsert(rel, tup);
499 1014 : heap_freetuple(tup);
500 :
501 : /* Register dependencies as needed */
502 1014 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
503 :
504 : /* Add dependency on the publication */
505 1014 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
506 1014 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
507 :
508 : /* Add dependency on the relation */
509 1014 : ObjectAddressSet(referenced, RelationRelationId, relid);
510 1014 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
511 :
512 : /* Add dependency on the objects mentioned in the qualifications */
513 1014 : if (pri->whereClause)
514 306 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
515 : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
516 : false);
517 :
518 : /* Add dependency on the columns, if any are listed */
519 1014 : i = -1;
520 1554 : while ((i = bms_next_member(attnums, i)) >= 0)
521 : {
522 540 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
523 540 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
524 : }
525 :
526 : /* Close the table. */
527 1014 : table_close(rel, RowExclusiveLock);
528 :
529 : /*
530 : * Invalidate relcache so that publication info is rebuilt.
531 : *
532 : * For the partitioned tables, we must invalidate all partitions contained
533 : * in the respective partition hierarchies, not just the one explicitly
534 : * mentioned in the publication. This is required because we implicitly
535 : * publish the child tables when the parent table is published.
536 : */
537 1014 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
538 : relid);
539 :
540 1014 : InvalidatePublicationRels(relids);
541 :
542 1014 : return myself;
543 : }
544 :
545 : /*
546 : * pub_collist_validate
547 : * Process and validate the 'columns' list and ensure the columns are all
548 : * valid to use for a publication. Checks for and raises an ERROR for
549 : * any unknown columns, system columns, duplicate columns, or virtual
550 : * generated columns.
551 : *
552 : * Looks up each column's attnum and returns a 0-based Bitmapset of the
553 : * corresponding attnums.
554 : */
555 : Bitmapset *
556 1442 : pub_collist_validate(Relation targetrel, List *columns)
557 : {
558 1442 : Bitmapset *set = NULL;
559 : ListCell *lc;
560 1442 : TupleDesc tupdesc = RelationGetDescr(targetrel);
561 :
562 2254 : foreach(lc, columns)
563 : {
564 848 : char *colname = strVal(lfirst(lc));
565 848 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
566 :
567 848 : if (attnum == InvalidAttrNumber)
568 6 : ereport(ERROR,
569 : errcode(ERRCODE_UNDEFINED_COLUMN),
570 : errmsg("column \"%s\" of relation \"%s\" does not exist",
571 : colname, RelationGetRelationName(targetrel)));
572 :
573 842 : if (!AttrNumberIsForUserDefinedAttr(attnum))
574 12 : ereport(ERROR,
575 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
576 : errmsg("cannot use system column \"%s\" in publication column list",
577 : colname));
578 :
579 830 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
580 6 : ereport(ERROR,
581 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
582 : errmsg("cannot use virtual generated column \"%s\" in publication column list",
583 : colname));
584 :
585 824 : if (bms_is_member(attnum, set))
586 12 : ereport(ERROR,
587 : errcode(ERRCODE_DUPLICATE_OBJECT),
588 : errmsg("duplicate column \"%s\" in publication column list",
589 : colname));
590 :
591 812 : set = bms_add_member(set, attnum);
592 : }
593 :
594 1406 : return set;
595 : }
596 :
597 : /*
598 : * Transform a column list (represented by an array Datum) to a bitmapset.
599 : *
600 : * If columns isn't NULL, add the column numbers to that set.
601 : *
602 : * If mcxt isn't NULL, build the bitmapset in that context.
603 : */
604 : Bitmapset *
605 444 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
606 : {
607 444 : Bitmapset *result = columns;
608 : ArrayType *arr;
609 : int nelems;
610 : int16 *elems;
611 444 : MemoryContext oldcxt = NULL;
612 :
613 444 : arr = DatumGetArrayTypeP(pubcols);
614 444 : nelems = ARR_DIMS(arr)[0];
615 444 : elems = (int16 *) ARR_DATA_PTR(arr);
616 :
617 : /* If a memory context was specified, switch to it. */
618 444 : if (mcxt)
619 78 : oldcxt = MemoryContextSwitchTo(mcxt);
620 :
621 1226 : for (int i = 0; i < nelems; i++)
622 782 : result = bms_add_member(result, elems[i]);
623 :
624 444 : if (mcxt)
625 78 : MemoryContextSwitchTo(oldcxt);
626 :
627 444 : return result;
628 : }
629 :
630 : /*
631 : * Returns a bitmap representing the columns of the specified table.
632 : *
633 : * Generated columns are included if include_gencols_type is
634 : * PUBLISH_GENCOLS_STORED.
635 : */
636 : Bitmapset *
637 18 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
638 : {
639 18 : Bitmapset *result = NULL;
640 18 : TupleDesc desc = RelationGetDescr(relation);
641 :
642 60 : for (int i = 0; i < desc->natts; i++)
643 : {
644 42 : Form_pg_attribute att = TupleDescAttr(desc, i);
645 :
646 42 : if (att->attisdropped)
647 2 : continue;
648 :
649 40 : if (att->attgenerated)
650 : {
651 : /* We only support replication of STORED generated cols. */
652 4 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
653 2 : continue;
654 :
655 : /* User hasn't requested to replicate STORED generated cols. */
656 2 : if (include_gencols_type != PUBLISH_GENCOLS_STORED)
657 2 : continue;
658 : }
659 :
660 36 : result = bms_add_member(result, att->attnum);
661 : }
662 :
663 18 : return result;
664 : }
665 :
666 : /*
667 : * Insert new publication / schema mapping.
668 : */
669 : ObjectAddress
670 222 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
671 : {
672 : Relation rel;
673 : HeapTuple tup;
674 : Datum values[Natts_pg_publication_namespace];
675 : bool nulls[Natts_pg_publication_namespace];
676 : Oid psschid;
677 222 : Publication *pub = GetPublication(pubid);
678 222 : List *schemaRels = NIL;
679 : ObjectAddress myself,
680 : referenced;
681 :
682 222 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
683 :
684 : /*
685 : * Check for duplicates. Note that this does not really prevent
686 : * duplicates, it's here just to provide nicer error message in common
687 : * case. The real protection is the unique key on the catalog.
688 : */
689 222 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
690 : ObjectIdGetDatum(schemaid),
691 : ObjectIdGetDatum(pubid)))
692 : {
693 18 : table_close(rel, RowExclusiveLock);
694 :
695 18 : if (if_not_exists)
696 12 : return InvalidObjectAddress;
697 :
698 6 : ereport(ERROR,
699 : (errcode(ERRCODE_DUPLICATE_OBJECT),
700 : errmsg("schema \"%s\" is already member of publication \"%s\"",
701 : get_namespace_name(schemaid), pub->name)));
702 : }
703 :
704 204 : check_publication_add_schema(schemaid);
705 :
706 : /* Form a tuple */
707 198 : memset(values, 0, sizeof(values));
708 198 : memset(nulls, false, sizeof(nulls));
709 :
710 198 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
711 : Anum_pg_publication_namespace_oid);
712 198 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
713 198 : values[Anum_pg_publication_namespace_pnpubid - 1] =
714 198 : ObjectIdGetDatum(pubid);
715 198 : values[Anum_pg_publication_namespace_pnnspid - 1] =
716 198 : ObjectIdGetDatum(schemaid);
717 :
718 198 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
719 :
720 : /* Insert tuple into catalog */
721 198 : CatalogTupleInsert(rel, tup);
722 198 : heap_freetuple(tup);
723 :
724 198 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
725 :
726 : /* Add dependency on the publication */
727 198 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
728 198 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
729 :
730 : /* Add dependency on the schema */
731 198 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
732 198 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
733 :
734 : /* Close the table */
735 198 : table_close(rel, RowExclusiveLock);
736 :
737 : /*
738 : * Invalidate relcache so that publication info is rebuilt. See
739 : * publication_add_relation for why we need to consider all the
740 : * partitions.
741 : */
742 198 : schemaRels = GetSchemaPublicationRelations(schemaid,
743 : PUBLICATION_PART_ALL);
744 198 : InvalidatePublicationRels(schemaRels);
745 :
746 198 : return myself;
747 : }
748 :
749 : /* Gets list of publication oids for a relation */
750 : List *
751 11914 : GetRelationPublications(Oid relid)
752 : {
753 11914 : List *result = NIL;
754 : CatCList *pubrellist;
755 : int i;
756 :
757 : /* Find all publications associated with the relation. */
758 11914 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
759 : ObjectIdGetDatum(relid));
760 13466 : for (i = 0; i < pubrellist->n_members; i++)
761 : {
762 1552 : HeapTuple tup = &pubrellist->members[i]->tuple;
763 1552 : Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
764 :
765 1552 : result = lappend_oid(result, pubid);
766 : }
767 :
768 11914 : ReleaseSysCacheList(pubrellist);
769 :
770 11914 : return result;
771 : }
772 :
773 : /*
774 : * Gets list of relation oids for a publication.
775 : *
776 : * This should only be used FOR TABLE publications, the FOR ALL TABLES
777 : * should use GetAllTablesPublicationRelations().
778 : */
779 : List *
780 2260 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
781 : {
782 : List *result;
783 : Relation pubrelsrel;
784 : ScanKeyData scankey;
785 : SysScanDesc scan;
786 : HeapTuple tup;
787 :
788 : /* Find all publications associated with the relation. */
789 2260 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
790 :
791 2260 : ScanKeyInit(&scankey,
792 : Anum_pg_publication_rel_prpubid,
793 : BTEqualStrategyNumber, F_OIDEQ,
794 : ObjectIdGetDatum(pubid));
795 :
796 2260 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
797 : true, NULL, 1, &scankey);
798 :
799 2260 : result = NIL;
800 5284 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
801 : {
802 : Form_pg_publication_rel pubrel;
803 :
804 3024 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
805 3024 : result = GetPubPartitionOptionRelations(result, pub_partopt,
806 : pubrel->prrelid);
807 : }
808 :
809 2260 : systable_endscan(scan);
810 2260 : table_close(pubrelsrel, AccessShareLock);
811 :
812 : /* Now sort and de-duplicate the result list */
813 2260 : list_sort(result, list_oid_cmp);
814 2260 : list_deduplicate_oid(result);
815 :
816 2260 : return result;
817 : }
818 :
819 : /*
820 : * Gets list of publication oids for publications marked as FOR ALL TABLES.
821 : */
822 : List *
823 8106 : GetAllTablesPublications(void)
824 : {
825 : List *result;
826 : Relation rel;
827 : ScanKeyData scankey;
828 : SysScanDesc scan;
829 : HeapTuple tup;
830 :
831 : /* Find all publications that are marked as for all tables. */
832 8106 : rel = table_open(PublicationRelationId, AccessShareLock);
833 :
834 8106 : ScanKeyInit(&scankey,
835 : Anum_pg_publication_puballtables,
836 : BTEqualStrategyNumber, F_BOOLEQ,
837 : BoolGetDatum(true));
838 :
839 8106 : scan = systable_beginscan(rel, InvalidOid, false,
840 : NULL, 1, &scankey);
841 :
842 8106 : result = NIL;
843 8298 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
844 : {
845 192 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
846 :
847 192 : result = lappend_oid(result, oid);
848 : }
849 :
850 8106 : systable_endscan(scan);
851 8106 : table_close(rel, AccessShareLock);
852 :
853 8106 : return result;
854 : }
855 :
856 : /*
857 : * Gets list of all relation published by FOR ALL TABLES publication(s).
858 : *
859 : * If the publication publishes partition changes via their respective root
860 : * partitioned tables, we must exclude partitions in favor of including the
861 : * root partitioned tables.
862 : */
863 : List *
864 332 : GetAllTablesPublicationRelations(bool pubviaroot)
865 : {
866 : Relation classRel;
867 : ScanKeyData key[1];
868 : TableScanDesc scan;
869 : HeapTuple tuple;
870 332 : List *result = NIL;
871 :
872 332 : classRel = table_open(RelationRelationId, AccessShareLock);
873 :
874 332 : ScanKeyInit(&key[0],
875 : Anum_pg_class_relkind,
876 : BTEqualStrategyNumber, F_CHAREQ,
877 : CharGetDatum(RELKIND_RELATION));
878 :
879 332 : scan = table_beginscan_catalog(classRel, 1, key);
880 :
881 24480 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
882 : {
883 24148 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
884 24148 : Oid relid = relForm->oid;
885 :
886 24148 : if (is_publishable_class(relid, relForm) &&
887 1560 : !(relForm->relispartition && pubviaroot))
888 1378 : result = lappend_oid(result, relid);
889 : }
890 :
891 332 : table_endscan(scan);
892 :
893 332 : if (pubviaroot)
894 : {
895 26 : ScanKeyInit(&key[0],
896 : Anum_pg_class_relkind,
897 : BTEqualStrategyNumber, F_CHAREQ,
898 : CharGetDatum(RELKIND_PARTITIONED_TABLE));
899 :
900 26 : scan = table_beginscan_catalog(classRel, 1, key);
901 :
902 156 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
903 : {
904 130 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
905 130 : Oid relid = relForm->oid;
906 :
907 130 : if (is_publishable_class(relid, relForm) &&
908 130 : !relForm->relispartition)
909 104 : result = lappend_oid(result, relid);
910 : }
911 :
912 26 : table_endscan(scan);
913 : }
914 :
915 332 : table_close(classRel, AccessShareLock);
916 332 : return result;
917 : }
918 :
919 : /*
920 : * Gets the list of schema oids for a publication.
921 : *
922 : * This should only be used FOR TABLES IN SCHEMA publications.
923 : */
924 : List *
925 2178 : GetPublicationSchemas(Oid pubid)
926 : {
927 2178 : List *result = NIL;
928 : Relation pubschsrel;
929 : ScanKeyData scankey;
930 : SysScanDesc scan;
931 : HeapTuple tup;
932 :
933 : /* Find all schemas associated with the publication */
934 2178 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
935 :
936 2178 : ScanKeyInit(&scankey,
937 : Anum_pg_publication_namespace_pnpubid,
938 : BTEqualStrategyNumber, F_OIDEQ,
939 : ObjectIdGetDatum(pubid));
940 :
941 2178 : scan = systable_beginscan(pubschsrel,
942 : PublicationNamespacePnnspidPnpubidIndexId,
943 : true, NULL, 1, &scankey);
944 2278 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
945 : {
946 : Form_pg_publication_namespace pubsch;
947 :
948 100 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
949 :
950 100 : result = lappend_oid(result, pubsch->pnnspid);
951 : }
952 :
953 2178 : systable_endscan(scan);
954 2178 : table_close(pubschsrel, AccessShareLock);
955 :
956 2178 : return result;
957 : }
958 :
959 : /*
960 : * Gets the list of publication oids associated with a specified schema.
961 : */
962 : List *
963 11498 : GetSchemaPublications(Oid schemaid)
964 : {
965 11498 : List *result = NIL;
966 : CatCList *pubschlist;
967 : int i;
968 :
969 : /* Find all publications associated with the schema */
970 11498 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
971 : ObjectIdGetDatum(schemaid));
972 11612 : for (i = 0; i < pubschlist->n_members; i++)
973 : {
974 114 : HeapTuple tup = &pubschlist->members[i]->tuple;
975 114 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
976 :
977 114 : result = lappend_oid(result, pubid);
978 : }
979 :
980 11498 : ReleaseSysCacheList(pubschlist);
981 :
982 11498 : return result;
983 : }
984 :
985 : /*
986 : * Get the list of publishable relation oids for a specified schema.
987 : */
988 : List *
989 460 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
990 : {
991 : Relation classRel;
992 : ScanKeyData key[1];
993 : TableScanDesc scan;
994 : HeapTuple tuple;
995 460 : List *result = NIL;
996 :
997 : Assert(OidIsValid(schemaid));
998 :
999 460 : classRel = table_open(RelationRelationId, AccessShareLock);
1000 :
1001 460 : ScanKeyInit(&key[0],
1002 : Anum_pg_class_relnamespace,
1003 : BTEqualStrategyNumber, F_OIDEQ,
1004 : schemaid);
1005 :
1006 : /* get all the relations present in the specified schema */
1007 460 : scan = table_beginscan_catalog(classRel, 1, key);
1008 25004 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1009 : {
1010 24544 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1011 24544 : Oid relid = relForm->oid;
1012 : char relkind;
1013 :
1014 24544 : if (!is_publishable_class(relid, relForm))
1015 10244 : continue;
1016 :
1017 14300 : relkind = get_rel_relkind(relid);
1018 14300 : if (relkind == RELKIND_RELATION)
1019 13566 : result = lappend_oid(result, relid);
1020 734 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1021 : {
1022 734 : List *partitionrels = NIL;
1023 :
1024 : /*
1025 : * It is quite possible that some of the partitions are in a
1026 : * different schema than the parent table, so we need to get such
1027 : * partitions separately.
1028 : */
1029 734 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1030 : pub_partopt,
1031 : relForm->oid);
1032 734 : result = list_concat_unique_oid(result, partitionrels);
1033 : }
1034 : }
1035 :
1036 460 : table_endscan(scan);
1037 460 : table_close(classRel, AccessShareLock);
1038 460 : return result;
1039 : }
1040 :
1041 : /*
1042 : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1043 : * publication.
1044 : */
1045 : List *
1046 1774 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1047 : {
1048 1774 : List *result = NIL;
1049 1774 : List *pubschemalist = GetPublicationSchemas(pubid);
1050 : ListCell *cell;
1051 :
1052 1844 : foreach(cell, pubschemalist)
1053 : {
1054 70 : Oid schemaid = lfirst_oid(cell);
1055 70 : List *schemaRels = NIL;
1056 :
1057 70 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1058 70 : result = list_concat(result, schemaRels);
1059 : }
1060 :
1061 1774 : return result;
1062 : }
1063 :
1064 : /*
1065 : * Get publication using oid
1066 : *
1067 : * The Publication struct and its data are palloc'ed here.
1068 : */
1069 : Publication *
1070 8710 : GetPublication(Oid pubid)
1071 : {
1072 : HeapTuple tup;
1073 : Publication *pub;
1074 : Form_pg_publication pubform;
1075 :
1076 8710 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1077 8710 : if (!HeapTupleIsValid(tup))
1078 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1079 :
1080 8710 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1081 :
1082 8710 : pub = (Publication *) palloc(sizeof(Publication));
1083 8710 : pub->oid = pubid;
1084 8710 : pub->name = pstrdup(NameStr(pubform->pubname));
1085 8710 : pub->alltables = pubform->puballtables;
1086 8710 : pub->pubactions.pubinsert = pubform->pubinsert;
1087 8710 : pub->pubactions.pubupdate = pubform->pubupdate;
1088 8710 : pub->pubactions.pubdelete = pubform->pubdelete;
1089 8710 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1090 8710 : pub->pubviaroot = pubform->pubviaroot;
1091 8710 : pub->pubgencols_type = pubform->pubgencols;
1092 :
1093 8710 : ReleaseSysCache(tup);
1094 :
1095 8710 : return pub;
1096 : }
1097 :
1098 : /*
1099 : * Get Publication using name.
1100 : */
1101 : Publication *
1102 2436 : GetPublicationByName(const char *pubname, bool missing_ok)
1103 : {
1104 : Oid oid;
1105 :
1106 2436 : oid = get_publication_oid(pubname, missing_ok);
1107 :
1108 2434 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1109 : }
1110 :
1111 : /*
1112 : * Get information of the tables in the given publication array.
1113 : *
1114 : * Returns pubid, relid, column list, row filter for each table.
1115 : */
1116 : Datum
1117 5980 : pg_get_publication_tables(PG_FUNCTION_ARGS)
1118 : {
1119 : #define NUM_PUBLICATION_TABLES_ELEM 4
1120 : FuncCallContext *funcctx;
1121 5980 : List *table_infos = NIL;
1122 :
1123 : /* stuff done only on the first call of the function */
1124 5980 : if (SRF_IS_FIRSTCALL())
1125 : {
1126 : TupleDesc tupdesc;
1127 : MemoryContext oldcontext;
1128 : ArrayType *arr;
1129 : Datum *elems;
1130 : int nelems,
1131 : i;
1132 1920 : bool viaroot = false;
1133 :
1134 : /* create a function context for cross-call persistence */
1135 1920 : funcctx = SRF_FIRSTCALL_INIT();
1136 :
1137 : /* switch to memory context appropriate for multiple function calls */
1138 1920 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1139 :
1140 : /*
1141 : * Deconstruct the parameter into elements where each element is a
1142 : * publication name.
1143 : */
1144 1920 : arr = PG_GETARG_ARRAYTYPE_P(0);
1145 1920 : deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
1146 :
1147 : /* Get Oids of tables from each publication. */
1148 3930 : for (i = 0; i < nelems; i++)
1149 : {
1150 : Publication *pub_elem;
1151 2010 : List *pub_elem_tables = NIL;
1152 : ListCell *lc;
1153 :
1154 2010 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1155 :
1156 : /*
1157 : * Publications support partitioned tables. If
1158 : * publish_via_partition_root is false, all changes are replicated
1159 : * using leaf partition identity and schema, so we only need
1160 : * those. Otherwise, get the partitioned table itself.
1161 : */
1162 2010 : if (pub_elem->alltables)
1163 332 : pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
1164 : else
1165 : {
1166 : List *relids,
1167 : *schemarelids;
1168 :
1169 1678 : relids = GetPublicationRelations(pub_elem->oid,
1170 1678 : pub_elem->pubviaroot ?
1171 1678 : PUBLICATION_PART_ROOT :
1172 : PUBLICATION_PART_LEAF);
1173 1678 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1174 1678 : pub_elem->pubviaroot ?
1175 1678 : PUBLICATION_PART_ROOT :
1176 : PUBLICATION_PART_LEAF);
1177 1678 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1178 : }
1179 :
1180 : /*
1181 : * Record the published table and the corresponding publication so
1182 : * that we can get row filters and column lists later.
1183 : *
1184 : * When a table is published by multiple publications, to obtain
1185 : * all row filters and column lists, the structure related to this
1186 : * table will be recorded multiple times.
1187 : */
1188 6150 : foreach(lc, pub_elem_tables)
1189 : {
1190 4140 : published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
1191 :
1192 4140 : table_info->relid = lfirst_oid(lc);
1193 4140 : table_info->pubid = pub_elem->oid;
1194 4140 : table_infos = lappend(table_infos, table_info);
1195 : }
1196 :
1197 : /* At least one publication is using publish_via_partition_root. */
1198 2010 : if (pub_elem->pubviaroot)
1199 368 : viaroot = true;
1200 : }
1201 :
1202 : /*
1203 : * If the publication publishes partition changes via their respective
1204 : * root partitioned tables, we must exclude partitions in favor of
1205 : * including the root partitioned tables. Otherwise, the function
1206 : * could return both the child and parent tables which could cause
1207 : * data of the child table to be double-published on the subscriber
1208 : * side.
1209 : */
1210 1920 : if (viaroot)
1211 352 : filter_partitions(table_infos);
1212 :
1213 : /* Construct a tuple descriptor for the result rows. */
1214 1920 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1215 1920 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1216 : OIDOID, -1, 0);
1217 1920 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1218 : OIDOID, -1, 0);
1219 1920 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1220 : INT2VECTOROID, -1, 0);
1221 1920 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1222 : PG_NODE_TREEOID, -1, 0);
1223 :
1224 1920 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1225 1920 : funcctx->user_fctx = table_infos;
1226 :
1227 1920 : MemoryContextSwitchTo(oldcontext);
1228 : }
1229 :
1230 : /* stuff done on every call of the function */
1231 5980 : funcctx = SRF_PERCALL_SETUP();
1232 5980 : table_infos = (List *) funcctx->user_fctx;
1233 :
1234 5980 : if (funcctx->call_cntr < list_length(table_infos))
1235 : {
1236 4060 : HeapTuple pubtuple = NULL;
1237 : HeapTuple rettuple;
1238 : Publication *pub;
1239 4060 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1240 4060 : Oid relid = table_info->relid;
1241 4060 : Oid schemaid = get_rel_namespace(relid);
1242 4060 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1243 4060 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1244 :
1245 : /*
1246 : * Form tuple with appropriate data.
1247 : */
1248 :
1249 4060 : pub = GetPublication(table_info->pubid);
1250 :
1251 4060 : values[0] = ObjectIdGetDatum(pub->oid);
1252 4060 : values[1] = ObjectIdGetDatum(relid);
1253 :
1254 : /*
1255 : * We don't consider row filters or column lists for FOR ALL TABLES or
1256 : * FOR TABLES IN SCHEMA publications.
1257 : */
1258 4060 : if (!pub->alltables &&
1259 2578 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1260 : ObjectIdGetDatum(schemaid),
1261 : ObjectIdGetDatum(pub->oid)))
1262 2484 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1263 : ObjectIdGetDatum(relid),
1264 : ObjectIdGetDatum(pub->oid));
1265 :
1266 4060 : if (HeapTupleIsValid(pubtuple))
1267 : {
1268 : /* Lookup the column list attribute. */
1269 2258 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1270 : Anum_pg_publication_rel_prattrs,
1271 : &(nulls[2]));
1272 :
1273 : /* Null indicates no filter. */
1274 2258 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1275 : Anum_pg_publication_rel_prqual,
1276 : &(nulls[3]));
1277 : }
1278 : else
1279 : {
1280 1802 : nulls[2] = true;
1281 1802 : nulls[3] = true;
1282 : }
1283 :
1284 : /* Show all columns when the column list is not specified. */
1285 4060 : if (nulls[2])
1286 : {
1287 3808 : Relation rel = table_open(relid, AccessShareLock);
1288 3808 : int nattnums = 0;
1289 : int16 *attnums;
1290 3808 : TupleDesc desc = RelationGetDescr(rel);
1291 : int i;
1292 :
1293 3808 : attnums = (int16 *) palloc(desc->natts * sizeof(int16));
1294 :
1295 10436 : for (i = 0; i < desc->natts; i++)
1296 : {
1297 6628 : Form_pg_attribute att = TupleDescAttr(desc, i);
1298 :
1299 6628 : if (att->attisdropped)
1300 12 : continue;
1301 :
1302 6616 : if (att->attgenerated)
1303 : {
1304 : /* We only support replication of STORED generated cols. */
1305 96 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1306 72 : continue;
1307 :
1308 : /*
1309 : * User hasn't requested to replicate STORED generated
1310 : * cols.
1311 : */
1312 24 : if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
1313 18 : continue;
1314 : }
1315 :
1316 6526 : attnums[nattnums++] = att->attnum;
1317 : }
1318 :
1319 3808 : if (nattnums > 0)
1320 : {
1321 3764 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1322 3764 : nulls[2] = false;
1323 : }
1324 :
1325 3808 : table_close(rel, AccessShareLock);
1326 : }
1327 :
1328 4060 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1329 :
1330 4060 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1331 : }
1332 :
1333 1920 : SRF_RETURN_DONE(funcctx);
1334 : }
|