Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_publication.c
4 : * publication C API manipulation
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_publication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/catalog.h"
22 : #include "catalog/dependency.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/partition.h"
27 : #include "catalog/pg_inherits.h"
28 : #include "catalog/pg_namespace.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/publicationcmds.h"
34 : #include "funcapi.h"
35 : #include "utils/array.h"
36 : #include "utils/builtins.h"
37 : #include "utils/catcache.h"
38 : #include "utils/fmgroids.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/syscache.h"
42 :
43 : /* Records association between publication and published table */
44 : typedef struct
45 : {
46 : Oid relid; /* OID of published table */
47 : Oid pubid; /* OID of publication that publishes this
48 : * table. */
49 : } published_rel;
50 :
51 : /*
52 : * Check if relation can be in given publication and throws appropriate
53 : * error if not.
54 : */
55 : static void
56 1036 : check_publication_add_relation(Relation targetrel)
57 : {
58 : /* Must be a regular or partitioned table */
59 1036 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
60 142 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
61 14 : ereport(ERROR,
62 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
63 : errmsg("cannot add relation \"%s\" to publication",
64 : RelationGetRelationName(targetrel)),
65 : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
66 :
67 : /* Can't be system table */
68 1022 : if (IsCatalogRelation(targetrel))
69 6 : ereport(ERROR,
70 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
71 : errmsg("cannot add relation \"%s\" to publication",
72 : RelationGetRelationName(targetrel)),
73 : errdetail("This operation is not supported for system tables.")));
74 :
75 : /* UNLOGGED and TEMP relations cannot be part of publication. */
76 1016 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
77 6 : ereport(ERROR,
78 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
79 : errmsg("cannot add relation \"%s\" to publication",
80 : RelationGetRelationName(targetrel)),
81 : errdetail("This operation is not supported for temporary tables.")));
82 1010 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
83 6 : ereport(ERROR,
84 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : errmsg("cannot add relation \"%s\" to publication",
86 : RelationGetRelationName(targetrel)),
87 : errdetail("This operation is not supported for unlogged tables.")));
88 1004 : }
89 :
90 : /*
91 : * Check if schema can be in given publication and throw appropriate error if
92 : * not.
93 : */
94 : static void
95 204 : check_publication_add_schema(Oid schemaid)
96 : {
97 : /* Can't be system namespace */
98 204 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
99 6 : ereport(ERROR,
100 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
101 : errmsg("cannot add schema \"%s\" to publication",
102 : get_namespace_name(schemaid)),
103 : errdetail("This operation is not supported for system schemas.")));
104 :
105 : /* Can't be temporary namespace */
106 198 : if (isAnyTempNamespace(schemaid))
107 0 : ereport(ERROR,
108 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
109 : errmsg("cannot add schema \"%s\" to publication",
110 : get_namespace_name(schemaid)),
111 : errdetail("Temporary schemas cannot be replicated.")));
112 198 : }
113 :
114 : /*
115 : * Returns if relation represented by oid and Form_pg_class entry
116 : * is publishable.
117 : *
118 : * Does same checks as check_publication_add_relation() above, but does not
119 : * need relation to be opened and also does not throw errors.
120 : *
121 : * XXX This also excludes all tables with relid < FirstNormalObjectId,
122 : * ie all tables created during initdb. This mainly affects the preinstalled
123 : * information_schema. IsCatalogRelationOid() only excludes tables with
124 : * relid < FirstUnpinnedObjectId, making that test rather redundant,
125 : * but really we should get rid of the FirstNormalObjectId test not
126 : * IsCatalogRelationOid. We can't do so today because we don't want
127 : * information_schema tables to be considered publishable; but this test
128 : * is really inadequate for that, since the information_schema could be
129 : * dropped and reloaded and then it'll be considered publishable. The best
130 : * long-term solution may be to add a "relispublishable" bool to pg_class,
131 : * and depend on that instead of OID checks.
132 : */
133 : static bool
134 592090 : is_publishable_class(Oid relid, Form_pg_class reltuple)
135 : {
136 603430 : return (reltuple->relkind == RELKIND_RELATION ||
137 11340 : reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
138 582158 : !IsCatalogRelationOid(relid) &&
139 1184180 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
140 : relid >= FirstNormalObjectId;
141 : }
142 :
143 : /*
144 : * Another variant of is_publishable_class(), taking a Relation.
145 : */
146 : bool
147 538700 : is_publishable_relation(Relation rel)
148 : {
149 538700 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
150 : }
151 :
152 : /*
153 : * SQL-callable variant of the above
154 : *
155 : * This returns null when the relation does not exist. This is intended to be
156 : * used for example in psql to avoid gratuitous errors when there are
157 : * concurrent catalog changes.
158 : */
159 : Datum
160 5576 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
161 : {
162 5576 : Oid relid = PG_GETARG_OID(0);
163 : HeapTuple tuple;
164 : bool result;
165 :
166 5576 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
167 5576 : if (!HeapTupleIsValid(tuple))
168 0 : PG_RETURN_NULL();
169 5576 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
170 5576 : ReleaseSysCache(tuple);
171 5576 : PG_RETURN_BOOL(result);
172 : }
173 :
174 : /*
175 : * Returns true if the ancestor is in the list of published relations.
176 : * Otherwise, returns false.
177 : */
178 : static bool
179 198 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
180 : {
181 : ListCell *lc;
182 :
183 668 : foreach(lc, table_infos)
184 : {
185 550 : Oid relid = ((published_rel *) lfirst(lc))->relid;
186 :
187 550 : if (relid == ancestor)
188 80 : return true;
189 : }
190 :
191 118 : return false;
192 : }
193 :
194 : /*
195 : * Filter out the partitions whose parent tables are also present in the list.
196 : */
197 : static void
198 336 : filter_partitions(List *table_infos)
199 : {
200 : ListCell *lc;
201 :
202 998 : foreach(lc, table_infos)
203 : {
204 662 : bool skip = false;
205 662 : List *ancestors = NIL;
206 : ListCell *lc2;
207 662 : published_rel *table_info = (published_rel *) lfirst(lc);
208 :
209 662 : if (get_rel_relispartition(table_info->relid))
210 198 : ancestors = get_partition_ancestors(table_info->relid);
211 :
212 780 : foreach(lc2, ancestors)
213 : {
214 198 : Oid ancestor = lfirst_oid(lc2);
215 :
216 198 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
217 : {
218 80 : skip = true;
219 80 : break;
220 : }
221 : }
222 :
223 662 : if (skip)
224 80 : table_infos = foreach_delete_current(table_infos, lc);
225 : }
226 336 : }
227 :
228 : /*
229 : * Returns true if any schema is associated with the publication, false if no
230 : * schema is associated with the publication.
231 : */
232 : bool
233 262 : is_schema_publication(Oid pubid)
234 : {
235 : Relation pubschsrel;
236 : ScanKeyData scankey;
237 : SysScanDesc scan;
238 : HeapTuple tup;
239 262 : bool result = false;
240 :
241 262 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
242 262 : ScanKeyInit(&scankey,
243 : Anum_pg_publication_namespace_pnpubid,
244 : BTEqualStrategyNumber, F_OIDEQ,
245 : ObjectIdGetDatum(pubid));
246 :
247 262 : scan = systable_beginscan(pubschsrel,
248 : PublicationNamespacePnnspidPnpubidIndexId,
249 : true, NULL, 1, &scankey);
250 262 : tup = systable_getnext(scan);
251 262 : result = HeapTupleIsValid(tup);
252 :
253 262 : systable_endscan(scan);
254 262 : table_close(pubschsrel, AccessShareLock);
255 :
256 262 : return result;
257 : }
258 :
259 : /*
260 : * Returns true if the relation has column list associated with the
261 : * publication, false otherwise.
262 : *
263 : * If a column list is found, the corresponding bitmap is returned through the
264 : * cols parameter, if provided. The bitmap is constructed within the given
265 : * memory context (mcxt).
266 : */
267 : bool
268 1464 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
269 : Bitmapset **cols)
270 : {
271 : HeapTuple cftuple;
272 1464 : bool found = false;
273 :
274 1464 : if (pub->alltables)
275 354 : return false;
276 :
277 1110 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
278 : ObjectIdGetDatum(relid),
279 : ObjectIdGetDatum(pub->oid));
280 1110 : if (HeapTupleIsValid(cftuple))
281 : {
282 : Datum cfdatum;
283 : bool isnull;
284 :
285 : /* Lookup the column list attribute. */
286 1008 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
287 : Anum_pg_publication_rel_prattrs, &isnull);
288 :
289 : /* Was a column list found? */
290 1008 : if (!isnull)
291 : {
292 : /* Build the column list bitmap in the given memory context. */
293 314 : if (cols)
294 308 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
295 :
296 314 : found = true;
297 : }
298 :
299 1008 : ReleaseSysCache(cftuple);
300 : }
301 :
302 1110 : return found;
303 : }
304 :
305 : /*
306 : * Gets the relations based on the publication partition option for a specified
307 : * relation.
308 : */
309 : List *
310 5446 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
311 : Oid relid)
312 : {
313 5446 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
314 : pub_partopt != PUBLICATION_PART_ROOT)
315 1128 : {
316 1128 : List *all_parts = find_all_inheritors(relid, NoLock,
317 : NULL);
318 :
319 1128 : if (pub_partopt == PUBLICATION_PART_ALL)
320 934 : result = list_concat(result, all_parts);
321 194 : else if (pub_partopt == PUBLICATION_PART_LEAF)
322 : {
323 : ListCell *lc;
324 :
325 710 : foreach(lc, all_parts)
326 : {
327 516 : Oid partOid = lfirst_oid(lc);
328 :
329 516 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
330 322 : result = lappend_oid(result, partOid);
331 : }
332 : }
333 : else
334 : Assert(false);
335 : }
336 : else
337 4318 : result = lappend_oid(result, relid);
338 :
339 5446 : return result;
340 : }
341 :
342 : /*
343 : * Returns the relid of the topmost ancestor that is published via this
344 : * publication if any and set its ancestor level to ancestor_level,
345 : * otherwise returns InvalidOid.
346 : *
347 : * The ancestor_level value allows us to compare the results for multiple
348 : * publications, and decide which value is higher up.
349 : *
350 : * Note that the list of ancestors should be ordered such that the topmost
351 : * ancestor is at the end of the list.
352 : */
353 : Oid
354 504 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
355 : {
356 : ListCell *lc;
357 504 : Oid topmost_relid = InvalidOid;
358 504 : int level = 0;
359 :
360 : /*
361 : * Find the "topmost" ancestor that is in this publication.
362 : */
363 1024 : foreach(lc, ancestors)
364 : {
365 520 : Oid ancestor = lfirst_oid(lc);
366 520 : List *apubids = GetRelationPublications(ancestor);
367 520 : List *aschemaPubids = NIL;
368 :
369 520 : level++;
370 :
371 520 : if (list_member_oid(apubids, puboid))
372 : {
373 312 : topmost_relid = ancestor;
374 :
375 312 : if (ancestor_level)
376 86 : *ancestor_level = level;
377 : }
378 : else
379 : {
380 208 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
381 208 : if (list_member_oid(aschemaPubids, puboid))
382 : {
383 10 : topmost_relid = ancestor;
384 :
385 10 : if (ancestor_level)
386 10 : *ancestor_level = level;
387 : }
388 : }
389 :
390 520 : list_free(apubids);
391 520 : list_free(aschemaPubids);
392 : }
393 :
394 504 : return topmost_relid;
395 : }
396 :
397 : /*
398 : * attnumstoint2vector
399 : * Convert a Bitmapset of AttrNumbers into an int2vector.
400 : *
401 : * AttrNumber numbers are 0-based, i.e., not offset by
402 : * FirstLowInvalidHeapAttributeNumber.
403 : */
404 : static int2vector *
405 308 : attnumstoint2vector(Bitmapset *attrs)
406 : {
407 : int2vector *result;
408 308 : int n = bms_num_members(attrs);
409 308 : int i = -1;
410 308 : int j = 0;
411 :
412 308 : result = buildint2vector(NULL, n);
413 :
414 848 : while ((i = bms_next_member(attrs, i)) >= 0)
415 : {
416 : Assert(i <= PG_INT16_MAX);
417 :
418 540 : result->values[j++] = (int16) i;
419 : }
420 :
421 308 : return result;
422 : }
423 :
424 : /*
425 : * Insert new publication / relation mapping.
426 : */
427 : ObjectAddress
428 1058 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
429 : bool if_not_exists)
430 : {
431 : Relation rel;
432 : HeapTuple tup;
433 : Datum values[Natts_pg_publication_rel];
434 : bool nulls[Natts_pg_publication_rel];
435 1058 : Relation targetrel = pri->relation;
436 1058 : Oid relid = RelationGetRelid(targetrel);
437 : Oid pubreloid;
438 : Bitmapset *attnums;
439 1058 : Publication *pub = GetPublication(pubid);
440 : ObjectAddress myself,
441 : referenced;
442 1058 : List *relids = NIL;
443 : int i;
444 :
445 1058 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
446 :
447 : /*
448 : * Check for duplicates. Note that this does not really prevent
449 : * duplicates, it's here just to provide nicer error message in common
450 : * case. The real protection is the unique key on the catalog.
451 : */
452 1058 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
453 : ObjectIdGetDatum(pubid)))
454 : {
455 22 : table_close(rel, RowExclusiveLock);
456 :
457 22 : if (if_not_exists)
458 16 : return InvalidObjectAddress;
459 :
460 6 : ereport(ERROR,
461 : (errcode(ERRCODE_DUPLICATE_OBJECT),
462 : errmsg("relation \"%s\" is already member of publication \"%s\"",
463 : RelationGetRelationName(targetrel), pub->name)));
464 : }
465 :
466 1036 : check_publication_add_relation(targetrel);
467 :
468 : /* Validate and translate column names into a Bitmapset of attnums. */
469 1004 : attnums = pub_collist_validate(pri->relation, pri->columns);
470 :
471 : /* Form a tuple. */
472 986 : memset(values, 0, sizeof(values));
473 986 : memset(nulls, false, sizeof(nulls));
474 :
475 986 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
476 : Anum_pg_publication_rel_oid);
477 986 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
478 986 : values[Anum_pg_publication_rel_prpubid - 1] =
479 986 : ObjectIdGetDatum(pubid);
480 986 : values[Anum_pg_publication_rel_prrelid - 1] =
481 986 : ObjectIdGetDatum(relid);
482 :
483 : /* Add qualifications, if available */
484 986 : if (pri->whereClause != NULL)
485 298 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
486 : else
487 688 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
488 :
489 : /* Add column list, if available */
490 986 : if (pri->columns)
491 308 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
492 : else
493 678 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
494 :
495 986 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
496 :
497 : /* Insert tuple into catalog. */
498 986 : CatalogTupleInsert(rel, tup);
499 986 : heap_freetuple(tup);
500 :
501 : /* Register dependencies as needed */
502 986 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
503 :
504 : /* Add dependency on the publication */
505 986 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
506 986 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
507 :
508 : /* Add dependency on the relation */
509 986 : ObjectAddressSet(referenced, RelationRelationId, relid);
510 986 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
511 :
512 : /* Add dependency on the objects mentioned in the qualifications */
513 986 : if (pri->whereClause)
514 298 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
515 : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
516 : false);
517 :
518 : /* Add dependency on the columns, if any are listed */
519 986 : i = -1;
520 1526 : while ((i = bms_next_member(attnums, i)) >= 0)
521 : {
522 540 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
523 540 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
524 : }
525 :
526 : /* Close the table. */
527 986 : table_close(rel, RowExclusiveLock);
528 :
529 : /*
530 : * Invalidate relcache so that publication info is rebuilt.
531 : *
532 : * For the partitioned tables, we must invalidate all partitions contained
533 : * in the respective partition hierarchies, not just the one explicitly
534 : * mentioned in the publication. This is required because we implicitly
535 : * publish the child tables when the parent table is published.
536 : */
537 986 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
538 : relid);
539 :
540 986 : InvalidatePublicationRels(relids);
541 :
542 986 : return myself;
543 : }
544 :
545 : /*
546 : * pub_collist_validate
547 : * Process and validate the 'columns' list and ensure the columns are all
548 : * valid to use for a publication. Checks for and raises an ERROR for
549 : * any unknown columns, system columns, or duplicate columns.
550 : *
551 : * Looks up each column's attnum and returns a 0-based Bitmapset of the
552 : * corresponding attnums.
553 : */
554 : Bitmapset *
555 1408 : pub_collist_validate(Relation targetrel, List *columns)
556 : {
557 1408 : Bitmapset *set = NULL;
558 : ListCell *lc;
559 :
560 2214 : foreach(lc, columns)
561 : {
562 836 : char *colname = strVal(lfirst(lc));
563 836 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
564 :
565 836 : if (attnum == InvalidAttrNumber)
566 6 : ereport(ERROR,
567 : errcode(ERRCODE_UNDEFINED_COLUMN),
568 : errmsg("column \"%s\" of relation \"%s\" does not exist",
569 : colname, RelationGetRelationName(targetrel)));
570 :
571 830 : if (!AttrNumberIsForUserDefinedAttr(attnum))
572 12 : ereport(ERROR,
573 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
574 : errmsg("cannot use system column \"%s\" in publication column list",
575 : colname));
576 :
577 818 : if (bms_is_member(attnum, set))
578 12 : ereport(ERROR,
579 : errcode(ERRCODE_DUPLICATE_OBJECT),
580 : errmsg("duplicate column \"%s\" in publication column list",
581 : colname));
582 :
583 806 : set = bms_add_member(set, attnum);
584 : }
585 :
586 1378 : return set;
587 : }
588 :
589 : /*
590 : * Transform a column list (represented by an array Datum) to a bitmapset.
591 : *
592 : * If columns isn't NULL, add the column numbers to that set.
593 : *
594 : * If mcxt isn't NULL, build the bitmapset in that context.
595 : */
596 : Bitmapset *
597 444 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
598 : {
599 444 : Bitmapset *result = columns;
600 : ArrayType *arr;
601 : int nelems;
602 : int16 *elems;
603 444 : MemoryContext oldcxt = NULL;
604 :
605 444 : arr = DatumGetArrayTypeP(pubcols);
606 444 : nelems = ARR_DIMS(arr)[0];
607 444 : elems = (int16 *) ARR_DATA_PTR(arr);
608 :
609 : /* If a memory context was specified, switch to it. */
610 444 : if (mcxt)
611 78 : oldcxt = MemoryContextSwitchTo(mcxt);
612 :
613 1226 : for (int i = 0; i < nelems; i++)
614 782 : result = bms_add_member(result, elems[i]);
615 :
616 444 : if (mcxt)
617 78 : MemoryContextSwitchTo(oldcxt);
618 :
619 444 : return result;
620 : }
621 :
622 : /*
623 : * Returns a bitmap representing the columns of the specified table.
624 : *
625 : * Generated columns are included if include_gencols is true.
626 : */
627 : Bitmapset *
628 18 : pub_form_cols_map(Relation relation, bool include_gencols)
629 : {
630 18 : Bitmapset *result = NULL;
631 18 : TupleDesc desc = RelationGetDescr(relation);
632 :
633 58 : for (int i = 0; i < desc->natts; i++)
634 : {
635 40 : Form_pg_attribute att = TupleDescAttr(desc, i);
636 :
637 40 : if (att->attisdropped || (att->attgenerated && !include_gencols))
638 4 : continue;
639 :
640 36 : result = bms_add_member(result, att->attnum);
641 : }
642 :
643 18 : return result;
644 : }
645 :
646 : /*
647 : * Insert new publication / schema mapping.
648 : */
649 : ObjectAddress
650 222 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
651 : {
652 : Relation rel;
653 : HeapTuple tup;
654 : Datum values[Natts_pg_publication_namespace];
655 : bool nulls[Natts_pg_publication_namespace];
656 : Oid psschid;
657 222 : Publication *pub = GetPublication(pubid);
658 222 : List *schemaRels = NIL;
659 : ObjectAddress myself,
660 : referenced;
661 :
662 222 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
663 :
664 : /*
665 : * Check for duplicates. Note that this does not really prevent
666 : * duplicates, it's here just to provide nicer error message in common
667 : * case. The real protection is the unique key on the catalog.
668 : */
669 222 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
670 : ObjectIdGetDatum(schemaid),
671 : ObjectIdGetDatum(pubid)))
672 : {
673 18 : table_close(rel, RowExclusiveLock);
674 :
675 18 : if (if_not_exists)
676 12 : return InvalidObjectAddress;
677 :
678 6 : ereport(ERROR,
679 : (errcode(ERRCODE_DUPLICATE_OBJECT),
680 : errmsg("schema \"%s\" is already member of publication \"%s\"",
681 : get_namespace_name(schemaid), pub->name)));
682 : }
683 :
684 204 : check_publication_add_schema(schemaid);
685 :
686 : /* Form a tuple */
687 198 : memset(values, 0, sizeof(values));
688 198 : memset(nulls, false, sizeof(nulls));
689 :
690 198 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
691 : Anum_pg_publication_namespace_oid);
692 198 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
693 198 : values[Anum_pg_publication_namespace_pnpubid - 1] =
694 198 : ObjectIdGetDatum(pubid);
695 198 : values[Anum_pg_publication_namespace_pnnspid - 1] =
696 198 : ObjectIdGetDatum(schemaid);
697 :
698 198 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
699 :
700 : /* Insert tuple into catalog */
701 198 : CatalogTupleInsert(rel, tup);
702 198 : heap_freetuple(tup);
703 :
704 198 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
705 :
706 : /* Add dependency on the publication */
707 198 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
708 198 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
709 :
710 : /* Add dependency on the schema */
711 198 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
712 198 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
713 :
714 : /* Close the table */
715 198 : table_close(rel, RowExclusiveLock);
716 :
717 : /*
718 : * Invalidate relcache so that publication info is rebuilt. See
719 : * publication_add_relation for why we need to consider all the
720 : * partitions.
721 : */
722 198 : schemaRels = GetSchemaPublicationRelations(schemaid,
723 : PUBLICATION_PART_ALL);
724 198 : InvalidatePublicationRels(schemaRels);
725 :
726 198 : return myself;
727 : }
728 :
729 : /* Gets list of publication oids for a relation */
730 : List *
731 11680 : GetRelationPublications(Oid relid)
732 : {
733 11680 : List *result = NIL;
734 : CatCList *pubrellist;
735 : int i;
736 :
737 : /* Find all publications associated with the relation. */
738 11680 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
739 : ObjectIdGetDatum(relid));
740 13204 : for (i = 0; i < pubrellist->n_members; i++)
741 : {
742 1524 : HeapTuple tup = &pubrellist->members[i]->tuple;
743 1524 : Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
744 :
745 1524 : result = lappend_oid(result, pubid);
746 : }
747 :
748 11680 : ReleaseSysCacheList(pubrellist);
749 :
750 11680 : return result;
751 : }
752 :
753 : /*
754 : * Gets list of relation oids for a publication.
755 : *
756 : * This should only be used FOR TABLE publications, the FOR ALL TABLES
757 : * should use GetAllTablesPublicationRelations().
758 : */
759 : List *
760 2130 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
761 : {
762 : List *result;
763 : Relation pubrelsrel;
764 : ScanKeyData scankey;
765 : SysScanDesc scan;
766 : HeapTuple tup;
767 :
768 : /* Find all publications associated with the relation. */
769 2130 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
770 :
771 2130 : ScanKeyInit(&scankey,
772 : Anum_pg_publication_rel_prpubid,
773 : BTEqualStrategyNumber, F_OIDEQ,
774 : ObjectIdGetDatum(pubid));
775 :
776 2130 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
777 : true, NULL, 1, &scankey);
778 :
779 2130 : result = NIL;
780 5010 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
781 : {
782 : Form_pg_publication_rel pubrel;
783 :
784 2880 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
785 2880 : result = GetPubPartitionOptionRelations(result, pub_partopt,
786 : pubrel->prrelid);
787 : }
788 :
789 2130 : systable_endscan(scan);
790 2130 : table_close(pubrelsrel, AccessShareLock);
791 :
792 : /* Now sort and de-duplicate the result list */
793 2130 : list_sort(result, list_oid_cmp);
794 2130 : list_deduplicate_oid(result);
795 :
796 2130 : return result;
797 : }
798 :
799 : /*
800 : * Gets list of publication oids for publications marked as FOR ALL TABLES.
801 : */
802 : List *
803 7978 : GetAllTablesPublications(void)
804 : {
805 : List *result;
806 : Relation rel;
807 : ScanKeyData scankey;
808 : SysScanDesc scan;
809 : HeapTuple tup;
810 :
811 : /* Find all publications that are marked as for all tables. */
812 7978 : rel = table_open(PublicationRelationId, AccessShareLock);
813 :
814 7978 : ScanKeyInit(&scankey,
815 : Anum_pg_publication_puballtables,
816 : BTEqualStrategyNumber, F_BOOLEQ,
817 : BoolGetDatum(true));
818 :
819 7978 : scan = systable_beginscan(rel, InvalidOid, false,
820 : NULL, 1, &scankey);
821 :
822 7978 : result = NIL;
823 8170 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
824 : {
825 192 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
826 :
827 192 : result = lappend_oid(result, oid);
828 : }
829 :
830 7978 : systable_endscan(scan);
831 7978 : table_close(rel, AccessShareLock);
832 :
833 7978 : return result;
834 : }
835 :
836 : /*
837 : * Gets list of all relation published by FOR ALL TABLES publication(s).
838 : *
839 : * If the publication publishes partition changes via their respective root
840 : * partitioned tables, we must exclude partitions in favor of including the
841 : * root partitioned tables.
842 : */
843 : List *
844 332 : GetAllTablesPublicationRelations(bool pubviaroot)
845 : {
846 : Relation classRel;
847 : ScanKeyData key[1];
848 : TableScanDesc scan;
849 : HeapTuple tuple;
850 332 : List *result = NIL;
851 :
852 332 : classRel = table_open(RelationRelationId, AccessShareLock);
853 :
854 332 : ScanKeyInit(&key[0],
855 : Anum_pg_class_relkind,
856 : BTEqualStrategyNumber, F_CHAREQ,
857 : CharGetDatum(RELKIND_RELATION));
858 :
859 332 : scan = table_beginscan_catalog(classRel, 1, key);
860 :
861 24480 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
862 : {
863 24148 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
864 24148 : Oid relid = relForm->oid;
865 :
866 24148 : if (is_publishable_class(relid, relForm) &&
867 1560 : !(relForm->relispartition && pubviaroot))
868 1378 : result = lappend_oid(result, relid);
869 : }
870 :
871 332 : table_endscan(scan);
872 :
873 332 : if (pubviaroot)
874 : {
875 26 : ScanKeyInit(&key[0],
876 : Anum_pg_class_relkind,
877 : BTEqualStrategyNumber, F_CHAREQ,
878 : CharGetDatum(RELKIND_PARTITIONED_TABLE));
879 :
880 26 : scan = table_beginscan_catalog(classRel, 1, key);
881 :
882 156 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
883 : {
884 130 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
885 130 : Oid relid = relForm->oid;
886 :
887 130 : if (is_publishable_class(relid, relForm) &&
888 130 : !relForm->relispartition)
889 104 : result = lappend_oid(result, relid);
890 : }
891 :
892 26 : table_endscan(scan);
893 : }
894 :
895 332 : table_close(classRel, AccessShareLock);
896 332 : return result;
897 : }
898 :
899 : /*
900 : * Gets the list of schema oids for a publication.
901 : *
902 : * This should only be used FOR TABLES IN SCHEMA publications.
903 : */
904 : List *
905 2048 : GetPublicationSchemas(Oid pubid)
906 : {
907 2048 : List *result = NIL;
908 : Relation pubschsrel;
909 : ScanKeyData scankey;
910 : SysScanDesc scan;
911 : HeapTuple tup;
912 :
913 : /* Find all schemas associated with the publication */
914 2048 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
915 :
916 2048 : ScanKeyInit(&scankey,
917 : Anum_pg_publication_namespace_pnpubid,
918 : BTEqualStrategyNumber, F_OIDEQ,
919 : ObjectIdGetDatum(pubid));
920 :
921 2048 : scan = systable_beginscan(pubschsrel,
922 : PublicationNamespacePnnspidPnpubidIndexId,
923 : true, NULL, 1, &scankey);
924 2148 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
925 : {
926 : Form_pg_publication_namespace pubsch;
927 :
928 100 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
929 :
930 100 : result = lappend_oid(result, pubsch->pnnspid);
931 : }
932 :
933 2048 : systable_endscan(scan);
934 2048 : table_close(pubschsrel, AccessShareLock);
935 :
936 2048 : return result;
937 : }
938 :
939 : /*
940 : * Gets the list of publication oids associated with a specified schema.
941 : */
942 : List *
943 11318 : GetSchemaPublications(Oid schemaid)
944 : {
945 11318 : List *result = NIL;
946 : CatCList *pubschlist;
947 : int i;
948 :
949 : /* Find all publications associated with the schema */
950 11318 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
951 : ObjectIdGetDatum(schemaid));
952 11432 : for (i = 0; i < pubschlist->n_members; i++)
953 : {
954 114 : HeapTuple tup = &pubschlist->members[i]->tuple;
955 114 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
956 :
957 114 : result = lappend_oid(result, pubid);
958 : }
959 :
960 11318 : ReleaseSysCacheList(pubschlist);
961 :
962 11318 : return result;
963 : }
964 :
965 : /*
966 : * Get the list of publishable relation oids for a specified schema.
967 : */
968 : List *
969 460 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
970 : {
971 : Relation classRel;
972 : ScanKeyData key[1];
973 : TableScanDesc scan;
974 : HeapTuple tuple;
975 460 : List *result = NIL;
976 :
977 : Assert(OidIsValid(schemaid));
978 :
979 460 : classRel = table_open(RelationRelationId, AccessShareLock);
980 :
981 460 : ScanKeyInit(&key[0],
982 : Anum_pg_class_relnamespace,
983 : BTEqualStrategyNumber, F_OIDEQ,
984 : schemaid);
985 :
986 : /* get all the relations present in the specified schema */
987 460 : scan = table_beginscan_catalog(classRel, 1, key);
988 23996 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
989 : {
990 23536 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
991 23536 : Oid relid = relForm->oid;
992 : char relkind;
993 :
994 23536 : if (!is_publishable_class(relid, relForm))
995 9620 : continue;
996 :
997 13916 : relkind = get_rel_relkind(relid);
998 13916 : if (relkind == RELKIND_RELATION)
999 13182 : result = lappend_oid(result, relid);
1000 734 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1001 : {
1002 734 : List *partitionrels = NIL;
1003 :
1004 : /*
1005 : * It is quite possible that some of the partitions are in a
1006 : * different schema than the parent table, so we need to get such
1007 : * partitions separately.
1008 : */
1009 734 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1010 : pub_partopt,
1011 : relForm->oid);
1012 734 : result = list_concat_unique_oid(result, partitionrels);
1013 : }
1014 : }
1015 :
1016 460 : table_endscan(scan);
1017 460 : table_close(classRel, AccessShareLock);
1018 460 : return result;
1019 : }
1020 :
1021 : /*
1022 : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1023 : * publication.
1024 : */
1025 : List *
1026 1644 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1027 : {
1028 1644 : List *result = NIL;
1029 1644 : List *pubschemalist = GetPublicationSchemas(pubid);
1030 : ListCell *cell;
1031 :
1032 1714 : foreach(cell, pubschemalist)
1033 : {
1034 70 : Oid schemaid = lfirst_oid(cell);
1035 70 : List *schemaRels = NIL;
1036 :
1037 70 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1038 70 : result = list_concat(result, schemaRels);
1039 : }
1040 :
1041 1644 : return result;
1042 : }
1043 :
1044 : /*
1045 : * Get publication using oid
1046 : *
1047 : * The Publication struct and its data are palloc'ed here.
1048 : */
1049 : Publication *
1050 8384 : GetPublication(Oid pubid)
1051 : {
1052 : HeapTuple tup;
1053 : Publication *pub;
1054 : Form_pg_publication pubform;
1055 :
1056 8384 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1057 8384 : if (!HeapTupleIsValid(tup))
1058 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1059 :
1060 8384 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1061 :
1062 8384 : pub = (Publication *) palloc(sizeof(Publication));
1063 8384 : pub->oid = pubid;
1064 8384 : pub->name = pstrdup(NameStr(pubform->pubname));
1065 8384 : pub->alltables = pubform->puballtables;
1066 8384 : pub->pubactions.pubinsert = pubform->pubinsert;
1067 8384 : pub->pubactions.pubupdate = pubform->pubupdate;
1068 8384 : pub->pubactions.pubdelete = pubform->pubdelete;
1069 8384 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1070 8384 : pub->pubviaroot = pubform->pubviaroot;
1071 8384 : pub->pubgencols = pubform->pubgencols;
1072 :
1073 8384 : ReleaseSysCache(tup);
1074 :
1075 8384 : return pub;
1076 : }
1077 :
1078 : /*
1079 : * Get Publication using name.
1080 : */
1081 : Publication *
1082 2302 : GetPublicationByName(const char *pubname, bool missing_ok)
1083 : {
1084 : Oid oid;
1085 :
1086 2302 : oid = get_publication_oid(pubname, missing_ok);
1087 :
1088 2300 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1089 : }
1090 :
1091 : /*
1092 : * Get information of the tables in the given publication array.
1093 : *
1094 : * Returns pubid, relid, column list, row filter for each table.
1095 : */
1096 : Datum
1097 5708 : pg_get_publication_tables(PG_FUNCTION_ARGS)
1098 : {
1099 : #define NUM_PUBLICATION_TABLES_ELEM 4
1100 : FuncCallContext *funcctx;
1101 5708 : List *table_infos = NIL;
1102 :
1103 : /* stuff done only on the first call of the function */
1104 5708 : if (SRF_IS_FIRSTCALL())
1105 : {
1106 : TupleDesc tupdesc;
1107 : MemoryContext oldcontext;
1108 : ArrayType *arr;
1109 : Datum *elems;
1110 : int nelems,
1111 : i;
1112 1794 : bool viaroot = false;
1113 :
1114 : /* create a function context for cross-call persistence */
1115 1794 : funcctx = SRF_FIRSTCALL_INIT();
1116 :
1117 : /* switch to memory context appropriate for multiple function calls */
1118 1794 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1119 :
1120 : /*
1121 : * Deconstruct the parameter into elements where each element is a
1122 : * publication name.
1123 : */
1124 1794 : arr = PG_GETARG_ARRAYTYPE_P(0);
1125 1794 : deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
1126 :
1127 : /* Get Oids of tables from each publication. */
1128 3674 : for (i = 0; i < nelems; i++)
1129 : {
1130 : Publication *pub_elem;
1131 1880 : List *pub_elem_tables = NIL;
1132 : ListCell *lc;
1133 :
1134 1880 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1135 :
1136 : /*
1137 : * Publications support partitioned tables. If
1138 : * publish_via_partition_root is false, all changes are replicated
1139 : * using leaf partition identity and schema, so we only need
1140 : * those. Otherwise, get the partitioned table itself.
1141 : */
1142 1880 : if (pub_elem->alltables)
1143 332 : pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
1144 : else
1145 : {
1146 : List *relids,
1147 : *schemarelids;
1148 :
1149 1548 : relids = GetPublicationRelations(pub_elem->oid,
1150 1548 : pub_elem->pubviaroot ?
1151 1548 : PUBLICATION_PART_ROOT :
1152 : PUBLICATION_PART_LEAF);
1153 1548 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1154 1548 : pub_elem->pubviaroot ?
1155 1548 : PUBLICATION_PART_ROOT :
1156 : PUBLICATION_PART_LEAF);
1157 1548 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1158 : }
1159 :
1160 : /*
1161 : * Record the published table and the corresponding publication so
1162 : * that we can get row filters and column lists later.
1163 : *
1164 : * When a table is published by multiple publications, to obtain
1165 : * all row filters and column lists, the structure related to this
1166 : * table will be recorded multiple times.
1167 : */
1168 5874 : foreach(lc, pub_elem_tables)
1169 : {
1170 3994 : published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
1171 :
1172 3994 : table_info->relid = lfirst_oid(lc);
1173 3994 : table_info->pubid = pub_elem->oid;
1174 3994 : table_infos = lappend(table_infos, table_info);
1175 : }
1176 :
1177 : /* At least one publication is using publish_via_partition_root. */
1178 1880 : if (pub_elem->pubviaroot)
1179 352 : viaroot = true;
1180 : }
1181 :
1182 : /*
1183 : * If the publication publishes partition changes via their respective
1184 : * root partitioned tables, we must exclude partitions in favor of
1185 : * including the root partitioned tables. Otherwise, the function
1186 : * could return both the child and parent tables which could cause
1187 : * data of the child table to be double-published on the subscriber
1188 : * side.
1189 : */
1190 1794 : if (viaroot)
1191 336 : filter_partitions(table_infos);
1192 :
1193 : /* Construct a tuple descriptor for the result rows. */
1194 1794 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1195 1794 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1196 : OIDOID, -1, 0);
1197 1794 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1198 : OIDOID, -1, 0);
1199 1794 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1200 : INT2VECTOROID, -1, 0);
1201 1794 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1202 : PG_NODE_TREEOID, -1, 0);
1203 :
1204 1794 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1205 1794 : funcctx->user_fctx = table_infos;
1206 :
1207 1794 : MemoryContextSwitchTo(oldcontext);
1208 : }
1209 :
1210 : /* stuff done on every call of the function */
1211 5708 : funcctx = SRF_PERCALL_SETUP();
1212 5708 : table_infos = (List *) funcctx->user_fctx;
1213 :
1214 5708 : if (funcctx->call_cntr < list_length(table_infos))
1215 : {
1216 3914 : HeapTuple pubtuple = NULL;
1217 : HeapTuple rettuple;
1218 : Publication *pub;
1219 3914 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1220 3914 : Oid relid = table_info->relid;
1221 3914 : Oid schemaid = get_rel_namespace(relid);
1222 3914 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1223 3914 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1224 :
1225 : /*
1226 : * Form tuple with appropriate data.
1227 : */
1228 :
1229 3914 : pub = GetPublication(table_info->pubid);
1230 :
1231 3914 : values[0] = ObjectIdGetDatum(pub->oid);
1232 3914 : values[1] = ObjectIdGetDatum(relid);
1233 :
1234 : /*
1235 : * We don't consider row filters or column lists for FOR ALL TABLES or
1236 : * FOR TABLES IN SCHEMA publications.
1237 : */
1238 3914 : if (!pub->alltables &&
1239 2432 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1240 : ObjectIdGetDatum(schemaid),
1241 : ObjectIdGetDatum(pub->oid)))
1242 2338 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1243 : ObjectIdGetDatum(relid),
1244 : ObjectIdGetDatum(pub->oid));
1245 :
1246 3914 : if (HeapTupleIsValid(pubtuple))
1247 : {
1248 : /* Lookup the column list attribute. */
1249 2120 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1250 : Anum_pg_publication_rel_prattrs,
1251 : &(nulls[2]));
1252 :
1253 : /* Null indicates no filter. */
1254 2120 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1255 : Anum_pg_publication_rel_prqual,
1256 : &(nulls[3]));
1257 : }
1258 : else
1259 : {
1260 1794 : nulls[2] = true;
1261 1794 : nulls[3] = true;
1262 : }
1263 :
1264 : /* Show all columns when the column list is not specified. */
1265 3914 : if (nulls[2])
1266 : {
1267 3662 : Relation rel = table_open(relid, AccessShareLock);
1268 3662 : int nattnums = 0;
1269 : int16 *attnums;
1270 3662 : TupleDesc desc = RelationGetDescr(rel);
1271 : int i;
1272 :
1273 3662 : attnums = (int16 *) palloc(desc->natts * sizeof(int16));
1274 :
1275 9980 : for (i = 0; i < desc->natts; i++)
1276 : {
1277 6318 : Form_pg_attribute att = TupleDescAttr(desc, i);
1278 :
1279 6318 : if (att->attisdropped || (att->attgenerated && !pub->pubgencols))
1280 30 : continue;
1281 :
1282 6288 : attnums[nattnums++] = att->attnum;
1283 : }
1284 :
1285 3662 : if (nattnums > 0)
1286 : {
1287 3618 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1288 3618 : nulls[2] = false;
1289 : }
1290 :
1291 3662 : table_close(rel, AccessShareLock);
1292 : }
1293 :
1294 3914 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1295 :
1296 3914 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1297 : }
1298 :
1299 1794 : SRF_RETURN_DONE(funcctx);
1300 : }
|