Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_publication.c
4 : * publication C API manipulation
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_publication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "access/xact.h"
22 : #include "catalog/catalog.h"
23 : #include "catalog/dependency.h"
24 : #include "catalog/index.h"
25 : #include "catalog/indexing.h"
26 : #include "catalog/namespace.h"
27 : #include "catalog/partition.h"
28 : #include "catalog/objectaccess.h"
29 : #include "catalog/objectaddress.h"
30 : #include "catalog/pg_inherits.h"
31 : #include "catalog/pg_namespace.h"
32 : #include "catalog/pg_publication.h"
33 : #include "catalog/pg_publication_namespace.h"
34 : #include "catalog/pg_publication_rel.h"
35 : #include "catalog/pg_type.h"
36 : #include "commands/publicationcmds.h"
37 : #include "funcapi.h"
38 : #include "miscadmin.h"
39 : #include "utils/array.h"
40 : #include "utils/builtins.h"
41 : #include "utils/catcache.h"
42 : #include "utils/fmgroids.h"
43 : #include "utils/inval.h"
44 : #include "utils/lsyscache.h"
45 : #include "utils/rel.h"
46 : #include "utils/syscache.h"
47 :
48 : /* Records association between publication and published table */
49 : typedef struct
50 : {
51 : Oid relid; /* OID of published table */
52 : Oid pubid; /* OID of publication that publishes this
53 : * table. */
54 : } published_rel;
55 :
56 : static void publication_translate_columns(Relation targetrel, List *columns,
57 : int *natts, AttrNumber **attrs);
58 :
59 : /*
60 : * Check if relation can be in given publication and throws appropriate
61 : * error if not.
62 : */
63 : static void
64 966 : check_publication_add_relation(Relation targetrel)
65 : {
66 : /* Must be a regular or partitioned table */
67 966 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
68 142 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
69 14 : ereport(ERROR,
70 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
71 : errmsg("cannot add relation \"%s\" to publication",
72 : RelationGetRelationName(targetrel)),
73 : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
74 :
75 : /* Can't be system table */
76 952 : if (IsCatalogRelation(targetrel))
77 6 : ereport(ERROR,
78 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
79 : errmsg("cannot add relation \"%s\" to publication",
80 : RelationGetRelationName(targetrel)),
81 : errdetail("This operation is not supported for system tables.")));
82 :
83 : /* UNLOGGED and TEMP relations cannot be part of publication. */
84 946 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
85 6 : ereport(ERROR,
86 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
87 : errmsg("cannot add relation \"%s\" to publication",
88 : RelationGetRelationName(targetrel)),
89 : errdetail("This operation is not supported for temporary tables.")));
90 940 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
91 6 : ereport(ERROR,
92 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
93 : errmsg("cannot add relation \"%s\" to publication",
94 : RelationGetRelationName(targetrel)),
95 : errdetail("This operation is not supported for unlogged tables.")));
96 934 : }
97 :
98 : /*
99 : * Check if schema can be in given publication and throw appropriate error if
100 : * not.
101 : */
102 : static void
103 204 : check_publication_add_schema(Oid schemaid)
104 : {
105 : /* Can't be system namespace */
106 204 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
107 6 : ereport(ERROR,
108 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
109 : errmsg("cannot add schema \"%s\" to publication",
110 : get_namespace_name(schemaid)),
111 : errdetail("This operation is not supported for system schemas.")));
112 :
113 : /* Can't be temporary namespace */
114 198 : if (isAnyTempNamespace(schemaid))
115 0 : ereport(ERROR,
116 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117 : errmsg("cannot add schema \"%s\" to publication",
118 : get_namespace_name(schemaid)),
119 : errdetail("Temporary schemas cannot be replicated.")));
120 198 : }
121 :
122 : /*
123 : * Returns if relation represented by oid and Form_pg_class entry
124 : * is publishable.
125 : *
126 : * Does same checks as check_publication_add_relation() above, but does not
127 : * need relation to be opened and also does not throw errors.
128 : *
129 : * XXX This also excludes all tables with relid < FirstNormalObjectId,
130 : * ie all tables created during initdb. This mainly affects the preinstalled
131 : * information_schema. IsCatalogRelationOid() only excludes tables with
132 : * relid < FirstUnpinnedObjectId, making that test rather redundant,
133 : * but really we should get rid of the FirstNormalObjectId test not
134 : * IsCatalogRelationOid. We can't do so today because we don't want
135 : * information_schema tables to be considered publishable; but this test
136 : * is really inadequate for that, since the information_schema could be
137 : * dropped and reloaded and then it'll be considered publishable. The best
138 : * long-term solution may be to add a "relispublishable" bool to pg_class,
139 : * and depend on that instead of OID checks.
140 : */
141 : static bool
142 586290 : is_publishable_class(Oid relid, Form_pg_class reltuple)
143 : {
144 597126 : return (reltuple->relkind == RELKIND_RELATION ||
145 10836 : reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
146 576598 : !IsCatalogRelationOid(relid) &&
147 1172580 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
148 : relid >= FirstNormalObjectId;
149 : }
150 :
151 : /*
152 : * Another variant of is_publishable_class(), taking a Relation.
153 : */
154 : bool
155 540164 : is_publishable_relation(Relation rel)
156 : {
157 540164 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
158 : }
159 :
160 : /*
161 : * SQL-callable variant of the above
162 : *
163 : * This returns null when the relation does not exist. This is intended to be
164 : * used for example in psql to avoid gratuitous errors when there are
165 : * concurrent catalog changes.
166 : */
167 : Datum
168 4180 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
169 : {
170 4180 : Oid relid = PG_GETARG_OID(0);
171 : HeapTuple tuple;
172 : bool result;
173 :
174 4180 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
175 4180 : if (!HeapTupleIsValid(tuple))
176 0 : PG_RETURN_NULL();
177 4180 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
178 4180 : ReleaseSysCache(tuple);
179 4180 : PG_RETURN_BOOL(result);
180 : }
181 :
182 : /*
183 : * Returns true if the ancestor is in the list of published relations.
184 : * Otherwise, returns false.
185 : */
186 : static bool
187 198 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
188 : {
189 : ListCell *lc;
190 :
191 668 : foreach(lc, table_infos)
192 : {
193 550 : Oid relid = ((published_rel *) lfirst(lc))->relid;
194 :
195 550 : if (relid == ancestor)
196 80 : return true;
197 : }
198 :
199 118 : return false;
200 : }
201 :
202 : /*
203 : * Filter out the partitions whose parent tables are also present in the list.
204 : */
205 : static void
206 320 : filter_partitions(List *table_infos)
207 : {
208 : ListCell *lc;
209 :
210 956 : foreach(lc, table_infos)
211 : {
212 636 : bool skip = false;
213 636 : List *ancestors = NIL;
214 : ListCell *lc2;
215 636 : published_rel *table_info = (published_rel *) lfirst(lc);
216 :
217 636 : if (get_rel_relispartition(table_info->relid))
218 198 : ancestors = get_partition_ancestors(table_info->relid);
219 :
220 754 : foreach(lc2, ancestors)
221 : {
222 198 : Oid ancestor = lfirst_oid(lc2);
223 :
224 198 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
225 : {
226 80 : skip = true;
227 80 : break;
228 : }
229 : }
230 :
231 636 : if (skip)
232 80 : table_infos = foreach_delete_current(table_infos, lc);
233 : }
234 320 : }
235 :
236 : /*
237 : * Returns true if any schema is associated with the publication, false if no
238 : * schema is associated with the publication.
239 : */
240 : bool
241 250 : is_schema_publication(Oid pubid)
242 : {
243 : Relation pubschsrel;
244 : ScanKeyData scankey;
245 : SysScanDesc scan;
246 : HeapTuple tup;
247 250 : bool result = false;
248 :
249 250 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
250 250 : ScanKeyInit(&scankey,
251 : Anum_pg_publication_namespace_pnpubid,
252 : BTEqualStrategyNumber, F_OIDEQ,
253 : ObjectIdGetDatum(pubid));
254 :
255 250 : scan = systable_beginscan(pubschsrel,
256 : PublicationNamespacePnnspidPnpubidIndexId,
257 : true, NULL, 1, &scankey);
258 250 : tup = systable_getnext(scan);
259 250 : result = HeapTupleIsValid(tup);
260 :
261 250 : systable_endscan(scan);
262 250 : table_close(pubschsrel, AccessShareLock);
263 :
264 250 : return result;
265 : }
266 :
267 : /*
268 : * Gets the relations based on the publication partition option for a specified
269 : * relation.
270 : */
271 : List *
272 4938 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
273 : Oid relid)
274 : {
275 4938 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
276 : pub_partopt != PUBLICATION_PART_ROOT)
277 934 : {
278 934 : List *all_parts = find_all_inheritors(relid, NoLock,
279 : NULL);
280 :
281 934 : if (pub_partopt == PUBLICATION_PART_ALL)
282 740 : result = list_concat(result, all_parts);
283 194 : else if (pub_partopt == PUBLICATION_PART_LEAF)
284 : {
285 : ListCell *lc;
286 :
287 710 : foreach(lc, all_parts)
288 : {
289 516 : Oid partOid = lfirst_oid(lc);
290 :
291 516 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
292 322 : result = lappend_oid(result, partOid);
293 : }
294 : }
295 : else
296 : Assert(false);
297 : }
298 : else
299 4004 : result = lappend_oid(result, relid);
300 :
301 4938 : return result;
302 : }
303 :
304 : /*
305 : * Returns the relid of the topmost ancestor that is published via this
306 : * publication if any and set its ancestor level to ancestor_level,
307 : * otherwise returns InvalidOid.
308 : *
309 : * The ancestor_level value allows us to compare the results for multiple
310 : * publications, and decide which value is higher up.
311 : *
312 : * Note that the list of ancestors should be ordered such that the topmost
313 : * ancestor is at the end of the list.
314 : */
315 : Oid
316 432 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
317 : {
318 : ListCell *lc;
319 432 : Oid topmost_relid = InvalidOid;
320 432 : int level = 0;
321 :
322 : /*
323 : * Find the "topmost" ancestor that is in this publication.
324 : */
325 878 : foreach(lc, ancestors)
326 : {
327 446 : Oid ancestor = lfirst_oid(lc);
328 446 : List *apubids = GetRelationPublications(ancestor);
329 446 : List *aschemaPubids = NIL;
330 :
331 446 : level++;
332 :
333 446 : if (list_member_oid(apubids, puboid))
334 : {
335 292 : topmost_relid = ancestor;
336 :
337 292 : if (ancestor_level)
338 70 : *ancestor_level = level;
339 : }
340 : else
341 : {
342 154 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
343 154 : if (list_member_oid(aschemaPubids, puboid))
344 : {
345 10 : topmost_relid = ancestor;
346 :
347 10 : if (ancestor_level)
348 10 : *ancestor_level = level;
349 : }
350 : }
351 :
352 446 : list_free(apubids);
353 446 : list_free(aschemaPubids);
354 : }
355 :
356 432 : return topmost_relid;
357 : }
358 :
359 : /*
360 : * Insert new publication / relation mapping.
361 : */
362 : ObjectAddress
363 988 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
364 : bool if_not_exists)
365 : {
366 : Relation rel;
367 : HeapTuple tup;
368 : Datum values[Natts_pg_publication_rel];
369 : bool nulls[Natts_pg_publication_rel];
370 988 : Relation targetrel = pri->relation;
371 988 : Oid relid = RelationGetRelid(targetrel);
372 : Oid pubreloid;
373 988 : Publication *pub = GetPublication(pubid);
374 988 : AttrNumber *attarray = NULL;
375 988 : int natts = 0;
376 : ObjectAddress myself,
377 : referenced;
378 988 : List *relids = NIL;
379 :
380 988 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
381 :
382 : /*
383 : * Check for duplicates. Note that this does not really prevent
384 : * duplicates, it's here just to provide nicer error message in common
385 : * case. The real protection is the unique key on the catalog.
386 : */
387 988 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
388 : ObjectIdGetDatum(pubid)))
389 : {
390 22 : table_close(rel, RowExclusiveLock);
391 :
392 22 : if (if_not_exists)
393 16 : return InvalidObjectAddress;
394 :
395 6 : ereport(ERROR,
396 : (errcode(ERRCODE_DUPLICATE_OBJECT),
397 : errmsg("relation \"%s\" is already member of publication \"%s\"",
398 : RelationGetRelationName(targetrel), pub->name)));
399 : }
400 :
401 966 : check_publication_add_relation(targetrel);
402 :
403 : /*
404 : * Translate column names to attnums and make sure the column list
405 : * contains only allowed elements (no system or generated columns etc.).
406 : * Also build an array of attnums, for storing in the catalog.
407 : */
408 934 : publication_translate_columns(pri->relation, pri->columns,
409 : &natts, &attarray);
410 :
411 : /* Form a tuple. */
412 916 : memset(values, 0, sizeof(values));
413 916 : memset(nulls, false, sizeof(nulls));
414 :
415 916 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
416 : Anum_pg_publication_rel_oid);
417 916 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
418 916 : values[Anum_pg_publication_rel_prpubid - 1] =
419 916 : ObjectIdGetDatum(pubid);
420 916 : values[Anum_pg_publication_rel_prrelid - 1] =
421 916 : ObjectIdGetDatum(relid);
422 :
423 : /* Add qualifications, if available */
424 916 : if (pri->whereClause != NULL)
425 298 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
426 : else
427 618 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
428 :
429 : /* Add column list, if available */
430 916 : if (pri->columns)
431 272 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts));
432 : else
433 644 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
434 :
435 916 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
436 :
437 : /* Insert tuple into catalog. */
438 916 : CatalogTupleInsert(rel, tup);
439 916 : heap_freetuple(tup);
440 :
441 : /* Register dependencies as needed */
442 916 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
443 :
444 : /* Add dependency on the publication */
445 916 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
446 916 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
447 :
448 : /* Add dependency on the relation */
449 916 : ObjectAddressSet(referenced, RelationRelationId, relid);
450 916 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
451 :
452 : /* Add dependency on the objects mentioned in the qualifications */
453 916 : if (pri->whereClause)
454 298 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
455 : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
456 : false);
457 :
458 : /* Add dependency on the columns, if any are listed */
459 1392 : for (int i = 0; i < natts; i++)
460 : {
461 476 : ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
462 476 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
463 : }
464 :
465 : /* Close the table. */
466 916 : table_close(rel, RowExclusiveLock);
467 :
468 : /*
469 : * Invalidate relcache so that publication info is rebuilt.
470 : *
471 : * For the partitioned tables, we must invalidate all partitions contained
472 : * in the respective partition hierarchies, not just the one explicitly
473 : * mentioned in the publication. This is required because we implicitly
474 : * publish the child tables when the parent table is published.
475 : */
476 916 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
477 : relid);
478 :
479 916 : InvalidatePublicationRels(relids);
480 :
481 916 : return myself;
482 : }
483 :
484 : /* qsort comparator for attnums */
485 : static int
486 204 : compare_int16(const void *a, const void *b)
487 : {
488 204 : int av = *(const int16 *) a;
489 204 : int bv = *(const int16 *) b;
490 :
491 : /* this can't overflow if int is wider than int16 */
492 204 : return (av - bv);
493 : }
494 :
495 : /*
496 : * Translate a list of column names to an array of attribute numbers
497 : * and a Bitmapset with them; verify that each attribute is appropriate
498 : * to have in a publication column list (no system or generated attributes,
499 : * no duplicates). Additional checks with replica identity are done later;
500 : * see pub_collist_contains_invalid_column.
501 : *
502 : * Note that the attribute numbers are *not* offset by
503 : * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
504 : * is okay.
505 : */
506 : static void
507 934 : publication_translate_columns(Relation targetrel, List *columns,
508 : int *natts, AttrNumber **attrs)
509 : {
510 934 : AttrNumber *attarray = NULL;
511 934 : Bitmapset *set = NULL;
512 : ListCell *lc;
513 934 : int n = 0;
514 934 : TupleDesc tupdesc = RelationGetDescr(targetrel);
515 :
516 : /* Bail out when no column list defined. */
517 934 : if (!columns)
518 644 : return;
519 :
520 : /*
521 : * Translate list of columns to attnums. We prohibit system attributes and
522 : * make sure there are no duplicate columns.
523 : */
524 290 : attarray = palloc(sizeof(AttrNumber) * list_length(columns));
525 784 : foreach(lc, columns)
526 : {
527 512 : char *colname = strVal(lfirst(lc));
528 512 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
529 :
530 512 : if (attnum == InvalidAttrNumber)
531 6 : ereport(ERROR,
532 : errcode(ERRCODE_UNDEFINED_COLUMN),
533 : errmsg("column \"%s\" of relation \"%s\" does not exist",
534 : colname, RelationGetRelationName(targetrel)));
535 :
536 506 : if (!AttrNumberIsForUserDefinedAttr(attnum))
537 6 : ereport(ERROR,
538 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
539 : errmsg("cannot use system column \"%s\" in publication column list",
540 : colname));
541 :
542 500 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
543 6 : ereport(ERROR,
544 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
545 : errmsg("cannot use generated column \"%s\" in publication column list",
546 : colname));
547 :
548 494 : if (bms_is_member(attnum, set))
549 0 : ereport(ERROR,
550 : errcode(ERRCODE_DUPLICATE_OBJECT),
551 : errmsg("duplicate column \"%s\" in publication column list",
552 : colname));
553 :
554 494 : set = bms_add_member(set, attnum);
555 494 : attarray[n++] = attnum;
556 : }
557 :
558 : /* Be tidy, so that the catalog representation is always sorted */
559 272 : qsort(attarray, n, sizeof(AttrNumber), compare_int16);
560 :
561 272 : *natts = n;
562 272 : *attrs = attarray;
563 :
564 272 : bms_free(set);
565 : }
566 :
567 : /*
568 : * Transform a column list (represented by an array Datum) to a bitmapset.
569 : *
570 : * If columns isn't NULL, add the column numbers to that set.
571 : *
572 : * If mcxt isn't NULL, build the bitmapset in that context.
573 : */
574 : Bitmapset *
575 428 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
576 : {
577 428 : Bitmapset *result = NULL;
578 : ArrayType *arr;
579 : int nelems;
580 : int16 *elems;
581 428 : MemoryContext oldcxt = NULL;
582 :
583 : /*
584 : * If an existing bitmap was provided, use it. Otherwise just use NULL and
585 : * build a new bitmap.
586 : */
587 428 : if (columns)
588 0 : result = columns;
589 :
590 428 : arr = DatumGetArrayTypeP(pubcols);
591 428 : nelems = ARR_DIMS(arr)[0];
592 428 : elems = (int16 *) ARR_DATA_PTR(arr);
593 :
594 : /* If a memory context was specified, switch to it. */
595 428 : if (mcxt)
596 74 : oldcxt = MemoryContextSwitchTo(mcxt);
597 :
598 1188 : for (int i = 0; i < nelems; i++)
599 760 : result = bms_add_member(result, elems[i]);
600 :
601 428 : if (mcxt)
602 74 : MemoryContextSwitchTo(oldcxt);
603 :
604 428 : return result;
605 : }
606 :
607 : /*
608 : * Insert new publication / schema mapping.
609 : */
610 : ObjectAddress
611 222 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
612 : {
613 : Relation rel;
614 : HeapTuple tup;
615 : Datum values[Natts_pg_publication_namespace];
616 : bool nulls[Natts_pg_publication_namespace];
617 : Oid psschid;
618 222 : Publication *pub = GetPublication(pubid);
619 222 : List *schemaRels = NIL;
620 : ObjectAddress myself,
621 : referenced;
622 :
623 222 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
624 :
625 : /*
626 : * Check for duplicates. Note that this does not really prevent
627 : * duplicates, it's here just to provide nicer error message in common
628 : * case. The real protection is the unique key on the catalog.
629 : */
630 222 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
631 : ObjectIdGetDatum(schemaid),
632 : ObjectIdGetDatum(pubid)))
633 : {
634 18 : table_close(rel, RowExclusiveLock);
635 :
636 18 : if (if_not_exists)
637 12 : return InvalidObjectAddress;
638 :
639 6 : ereport(ERROR,
640 : (errcode(ERRCODE_DUPLICATE_OBJECT),
641 : errmsg("schema \"%s\" is already member of publication \"%s\"",
642 : get_namespace_name(schemaid), pub->name)));
643 : }
644 :
645 204 : check_publication_add_schema(schemaid);
646 :
647 : /* Form a tuple */
648 198 : memset(values, 0, sizeof(values));
649 198 : memset(nulls, false, sizeof(nulls));
650 :
651 198 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
652 : Anum_pg_publication_namespace_oid);
653 198 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
654 198 : values[Anum_pg_publication_namespace_pnpubid - 1] =
655 198 : ObjectIdGetDatum(pubid);
656 198 : values[Anum_pg_publication_namespace_pnnspid - 1] =
657 198 : ObjectIdGetDatum(schemaid);
658 :
659 198 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
660 :
661 : /* Insert tuple into catalog */
662 198 : CatalogTupleInsert(rel, tup);
663 198 : heap_freetuple(tup);
664 :
665 198 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
666 :
667 : /* Add dependency on the publication */
668 198 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
669 198 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
670 :
671 : /* Add dependency on the schema */
672 198 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
673 198 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
674 :
675 : /* Close the table */
676 198 : table_close(rel, RowExclusiveLock);
677 :
678 : /*
679 : * Invalidate relcache so that publication info is rebuilt. See
680 : * publication_add_relation for why we need to consider all the
681 : * partitions.
682 : */
683 198 : schemaRels = GetSchemaPublicationRelations(schemaid,
684 : PUBLICATION_PART_ALL);
685 198 : InvalidatePublicationRels(schemaRels);
686 :
687 198 : return myself;
688 : }
689 :
690 : /* Gets list of publication oids for a relation */
691 : List *
692 10584 : GetRelationPublications(Oid relid)
693 : {
694 10584 : List *result = NIL;
695 : CatCList *pubrellist;
696 : int i;
697 :
698 : /* Find all publications associated with the relation. */
699 10584 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
700 : ObjectIdGetDatum(relid));
701 11968 : for (i = 0; i < pubrellist->n_members; i++)
702 : {
703 1384 : HeapTuple tup = &pubrellist->members[i]->tuple;
704 1384 : Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
705 :
706 1384 : result = lappend_oid(result, pubid);
707 : }
708 :
709 10584 : ReleaseSysCacheList(pubrellist);
710 :
711 10584 : return result;
712 : }
713 :
714 : /*
715 : * Gets list of relation oids for a publication.
716 : *
717 : * This should only be used FOR TABLE publications, the FOR ALL TABLES
718 : * should use GetAllTablesPublicationRelations().
719 : */
720 : List *
721 2014 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
722 : {
723 : List *result;
724 : Relation pubrelsrel;
725 : ScanKeyData scankey;
726 : SysScanDesc scan;
727 : HeapTuple tup;
728 :
729 : /* Find all publications associated with the relation. */
730 2014 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
731 :
732 2014 : ScanKeyInit(&scankey,
733 : Anum_pg_publication_rel_prpubid,
734 : BTEqualStrategyNumber, F_OIDEQ,
735 : ObjectIdGetDatum(pubid));
736 :
737 2014 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
738 : true, NULL, 1, &scankey);
739 :
740 2014 : result = NIL;
741 4712 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
742 : {
743 : Form_pg_publication_rel pubrel;
744 :
745 2698 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
746 2698 : result = GetPubPartitionOptionRelations(result, pub_partopt,
747 : pubrel->prrelid);
748 : }
749 :
750 2014 : systable_endscan(scan);
751 2014 : table_close(pubrelsrel, AccessShareLock);
752 :
753 : /* Now sort and de-duplicate the result list */
754 2014 : list_sort(result, list_oid_cmp);
755 2014 : list_deduplicate_oid(result);
756 :
757 2014 : return result;
758 : }
759 :
760 : /*
761 : * Gets list of publication oids for publications marked as FOR ALL TABLES.
762 : */
763 : List *
764 7322 : GetAllTablesPublications(void)
765 : {
766 : List *result;
767 : Relation rel;
768 : ScanKeyData scankey;
769 : SysScanDesc scan;
770 : HeapTuple tup;
771 :
772 : /* Find all publications that are marked as for all tables. */
773 7322 : rel = table_open(PublicationRelationId, AccessShareLock);
774 :
775 7322 : ScanKeyInit(&scankey,
776 : Anum_pg_publication_puballtables,
777 : BTEqualStrategyNumber, F_BOOLEQ,
778 : BoolGetDatum(true));
779 :
780 7322 : scan = systable_beginscan(rel, InvalidOid, false,
781 : NULL, 1, &scankey);
782 :
783 7322 : result = NIL;
784 7468 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
785 : {
786 146 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
787 :
788 146 : result = lappend_oid(result, oid);
789 : }
790 :
791 7322 : systable_endscan(scan);
792 7322 : table_close(rel, AccessShareLock);
793 :
794 7322 : return result;
795 : }
796 :
797 : /*
798 : * Gets list of all relation published by FOR ALL TABLES publication(s).
799 : *
800 : * If the publication publishes partition changes via their respective root
801 : * partitioned tables, we must exclude partitions in favor of including the
802 : * root partitioned tables.
803 : */
804 : List *
805 254 : GetAllTablesPublicationRelations(bool pubviaroot)
806 : {
807 : Relation classRel;
808 : ScanKeyData key[1];
809 : TableScanDesc scan;
810 : HeapTuple tuple;
811 254 : List *result = NIL;
812 :
813 254 : classRel = table_open(RelationRelationId, AccessShareLock);
814 :
815 254 : ScanKeyInit(&key[0],
816 : Anum_pg_class_relkind,
817 : BTEqualStrategyNumber, F_CHAREQ,
818 : CharGetDatum(RELKIND_RELATION));
819 :
820 254 : scan = table_beginscan_catalog(classRel, 1, key);
821 :
822 18930 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
823 : {
824 18676 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
825 18676 : Oid relid = relForm->oid;
826 :
827 18676 : if (is_publishable_class(relid, relForm) &&
828 1392 : !(relForm->relispartition && pubviaroot))
829 1210 : result = lappend_oid(result, relid);
830 : }
831 :
832 254 : table_endscan(scan);
833 :
834 254 : if (pubviaroot)
835 : {
836 26 : ScanKeyInit(&key[0],
837 : Anum_pg_class_relkind,
838 : BTEqualStrategyNumber, F_CHAREQ,
839 : CharGetDatum(RELKIND_PARTITIONED_TABLE));
840 :
841 26 : scan = table_beginscan_catalog(classRel, 1, key);
842 :
843 156 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
844 : {
845 130 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
846 130 : Oid relid = relForm->oid;
847 :
848 130 : if (is_publishable_class(relid, relForm) &&
849 130 : !relForm->relispartition)
850 104 : result = lappend_oid(result, relid);
851 : }
852 :
853 26 : table_endscan(scan);
854 : }
855 :
856 254 : table_close(classRel, AccessShareLock);
857 254 : return result;
858 : }
859 :
860 : /*
861 : * Gets the list of schema oids for a publication.
862 : *
863 : * This should only be used FOR TABLES IN SCHEMA publications.
864 : */
865 : List *
866 1944 : GetPublicationSchemas(Oid pubid)
867 : {
868 1944 : List *result = NIL;
869 : Relation pubschsrel;
870 : ScanKeyData scankey;
871 : SysScanDesc scan;
872 : HeapTuple tup;
873 :
874 : /* Find all schemas associated with the publication */
875 1944 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
876 :
877 1944 : ScanKeyInit(&scankey,
878 : Anum_pg_publication_namespace_pnpubid,
879 : BTEqualStrategyNumber, F_OIDEQ,
880 : ObjectIdGetDatum(pubid));
881 :
882 1944 : scan = systable_beginscan(pubschsrel,
883 : PublicationNamespacePnnspidPnpubidIndexId,
884 : true, NULL, 1, &scankey);
885 2044 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
886 : {
887 : Form_pg_publication_namespace pubsch;
888 :
889 100 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
890 :
891 100 : result = lappend_oid(result, pubsch->pnnspid);
892 : }
893 :
894 1944 : systable_endscan(scan);
895 1944 : table_close(pubschsrel, AccessShareLock);
896 :
897 1944 : return result;
898 : }
899 :
900 : /*
901 : * Gets the list of publication oids associated with a specified schema.
902 : */
903 : List *
904 10260 : GetSchemaPublications(Oid schemaid)
905 : {
906 10260 : List *result = NIL;
907 : CatCList *pubschlist;
908 : int i;
909 :
910 : /* Find all publications associated with the schema */
911 10260 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
912 : ObjectIdGetDatum(schemaid));
913 10374 : for (i = 0; i < pubschlist->n_members; i++)
914 : {
915 114 : HeapTuple tup = &pubschlist->members[i]->tuple;
916 114 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
917 :
918 114 : result = lappend_oid(result, pubid);
919 : }
920 :
921 10260 : ReleaseSysCacheList(pubschlist);
922 :
923 10260 : return result;
924 : }
925 :
926 : /*
927 : * Get the list of publishable relation oids for a specified schema.
928 : */
929 : List *
930 460 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
931 : {
932 : Relation classRel;
933 : ScanKeyData key[1];
934 : TableScanDesc scan;
935 : HeapTuple tuple;
936 460 : List *result = NIL;
937 :
938 : Assert(OidIsValid(schemaid));
939 :
940 460 : classRel = table_open(RelationRelationId, AccessShareLock);
941 :
942 460 : ScanKeyInit(&key[0],
943 : Anum_pg_class_relnamespace,
944 : BTEqualStrategyNumber, F_OIDEQ,
945 : schemaid);
946 :
947 : /* get all the relations present in the specified schema */
948 460 : scan = table_beginscan_catalog(classRel, 1, key);
949 23600 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
950 : {
951 23140 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
952 23140 : Oid relid = relForm->oid;
953 : char relkind;
954 :
955 23140 : if (!is_publishable_class(relid, relForm))
956 9232 : continue;
957 :
958 13908 : relkind = get_rel_relkind(relid);
959 13908 : if (relkind == RELKIND_RELATION)
960 13368 : result = lappend_oid(result, relid);
961 540 : else if (relkind == RELKIND_PARTITIONED_TABLE)
962 : {
963 540 : List *partitionrels = NIL;
964 :
965 : /*
966 : * It is quite possible that some of the partitions are in a
967 : * different schema than the parent table, so we need to get such
968 : * partitions separately.
969 : */
970 540 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
971 : pub_partopt,
972 : relForm->oid);
973 540 : result = list_concat_unique_oid(result, partitionrels);
974 : }
975 : }
976 :
977 460 : table_endscan(scan);
978 460 : table_close(classRel, AccessShareLock);
979 460 : return result;
980 : }
981 :
982 : /*
983 : * Gets the list of all relations published by FOR TABLES IN SCHEMA
984 : * publication.
985 : */
986 : List *
987 1552 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
988 : {
989 1552 : List *result = NIL;
990 1552 : List *pubschemalist = GetPublicationSchemas(pubid);
991 : ListCell *cell;
992 :
993 1622 : foreach(cell, pubschemalist)
994 : {
995 70 : Oid schemaid = lfirst_oid(cell);
996 70 : List *schemaRels = NIL;
997 :
998 70 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
999 70 : result = list_concat(result, schemaRels);
1000 : }
1001 :
1002 1552 : return result;
1003 : }
1004 :
1005 : /*
1006 : * Get publication using oid
1007 : *
1008 : * The Publication struct and its data are palloc'ed here.
1009 : */
1010 : Publication *
1011 6890 : GetPublication(Oid pubid)
1012 : {
1013 : HeapTuple tup;
1014 : Publication *pub;
1015 : Form_pg_publication pubform;
1016 :
1017 6890 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1018 6890 : if (!HeapTupleIsValid(tup))
1019 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1020 :
1021 6890 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1022 :
1023 6890 : pub = (Publication *) palloc(sizeof(Publication));
1024 6890 : pub->oid = pubid;
1025 6890 : pub->name = pstrdup(NameStr(pubform->pubname));
1026 6890 : pub->alltables = pubform->puballtables;
1027 6890 : pub->pubactions.pubinsert = pubform->pubinsert;
1028 6890 : pub->pubactions.pubupdate = pubform->pubupdate;
1029 6890 : pub->pubactions.pubdelete = pubform->pubdelete;
1030 6890 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1031 6890 : pub->pubviaroot = pubform->pubviaroot;
1032 :
1033 6890 : ReleaseSysCache(tup);
1034 :
1035 6890 : return pub;
1036 : }
1037 :
1038 : /*
1039 : * Get Publication using name.
1040 : */
1041 : Publication *
1042 2086 : GetPublicationByName(const char *pubname, bool missing_ok)
1043 : {
1044 : Oid oid;
1045 :
1046 2086 : oid = get_publication_oid(pubname, missing_ok);
1047 :
1048 2084 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1049 : }
1050 :
1051 : /*
1052 : * Get information of the tables in the given publication array.
1053 : *
1054 : * Returns pubid, relid, column list, row filter for each table.
1055 : */
1056 : Datum
1057 5224 : pg_get_publication_tables(PG_FUNCTION_ARGS)
1058 : {
1059 : #define NUM_PUBLICATION_TABLES_ELEM 4
1060 : FuncCallContext *funcctx;
1061 5224 : List *table_infos = NIL;
1062 :
1063 : /* stuff done only on the first call of the function */
1064 5224 : if (SRF_IS_FIRSTCALL())
1065 : {
1066 : TupleDesc tupdesc;
1067 : MemoryContext oldcontext;
1068 : ArrayType *arr;
1069 : Datum *elems;
1070 : int nelems,
1071 : i;
1072 1628 : bool viaroot = false;
1073 :
1074 : /* create a function context for cross-call persistence */
1075 1628 : funcctx = SRF_FIRSTCALL_INIT();
1076 :
1077 : /* switch to memory context appropriate for multiple function calls */
1078 1628 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1079 :
1080 : /*
1081 : * Deconstruct the parameter into elements where each element is a
1082 : * publication name.
1083 : */
1084 1628 : arr = PG_GETARG_ARRAYTYPE_P(0);
1085 1628 : deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
1086 : &elems, NULL, &nelems);
1087 :
1088 : /* Get Oids of tables from each publication. */
1089 3344 : for (i = 0; i < nelems; i++)
1090 : {
1091 : Publication *pub_elem;
1092 1716 : List *pub_elem_tables = NIL;
1093 : ListCell *lc;
1094 :
1095 1716 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1096 :
1097 : /*
1098 : * Publications support partitioned tables. If
1099 : * publish_via_partition_root is false, all changes are replicated
1100 : * using leaf partition identity and schema, so we only need
1101 : * those. Otherwise, get the partitioned table itself.
1102 : */
1103 1716 : if (pub_elem->alltables)
1104 254 : pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
1105 : else
1106 : {
1107 : List *relids,
1108 : *schemarelids;
1109 :
1110 1462 : relids = GetPublicationRelations(pub_elem->oid,
1111 1462 : pub_elem->pubviaroot ?
1112 1462 : PUBLICATION_PART_ROOT :
1113 : PUBLICATION_PART_LEAF);
1114 1462 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1115 1462 : pub_elem->pubviaroot ?
1116 1462 : PUBLICATION_PART_ROOT :
1117 : PUBLICATION_PART_LEAF);
1118 1462 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1119 : }
1120 :
1121 : /*
1122 : * Record the published table and the corresponding publication so
1123 : * that we can get row filters and column lists later.
1124 : *
1125 : * When a table is published by multiple publications, to obtain
1126 : * all row filters and column lists, the structure related to this
1127 : * table will be recorded multiple times.
1128 : */
1129 5392 : foreach(lc, pub_elem_tables)
1130 : {
1131 3676 : published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
1132 :
1133 3676 : table_info->relid = lfirst_oid(lc);
1134 3676 : table_info->pubid = pub_elem->oid;
1135 3676 : table_infos = lappend(table_infos, table_info);
1136 : }
1137 :
1138 : /* At least one publication is using publish_via_partition_root. */
1139 1716 : if (pub_elem->pubviaroot)
1140 336 : viaroot = true;
1141 : }
1142 :
1143 : /*
1144 : * If the publication publishes partition changes via their respective
1145 : * root partitioned tables, we must exclude partitions in favor of
1146 : * including the root partitioned tables. Otherwise, the function
1147 : * could return both the child and parent tables which could cause
1148 : * data of the child table to be double-published on the subscriber
1149 : * side.
1150 : */
1151 1628 : if (viaroot)
1152 320 : filter_partitions(table_infos);
1153 :
1154 : /* Construct a tuple descriptor for the result rows. */
1155 1628 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1156 1628 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1157 : OIDOID, -1, 0);
1158 1628 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1159 : OIDOID, -1, 0);
1160 1628 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1161 : INT2VECTOROID, -1, 0);
1162 1628 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1163 : PG_NODE_TREEOID, -1, 0);
1164 :
1165 1628 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1166 1628 : funcctx->user_fctx = (void *) table_infos;
1167 :
1168 1628 : MemoryContextSwitchTo(oldcontext);
1169 : }
1170 :
1171 : /* stuff done on every call of the function */
1172 5224 : funcctx = SRF_PERCALL_SETUP();
1173 5224 : table_infos = (List *) funcctx->user_fctx;
1174 :
1175 5224 : if (funcctx->call_cntr < list_length(table_infos))
1176 : {
1177 3596 : HeapTuple pubtuple = NULL;
1178 : HeapTuple rettuple;
1179 : Publication *pub;
1180 3596 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1181 3596 : Oid relid = table_info->relid;
1182 3596 : Oid schemaid = get_rel_namespace(relid);
1183 3596 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1184 3596 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1185 :
1186 : /*
1187 : * Form tuple with appropriate data.
1188 : */
1189 :
1190 3596 : pub = GetPublication(table_info->pubid);
1191 :
1192 3596 : values[0] = ObjectIdGetDatum(pub->oid);
1193 3596 : values[1] = ObjectIdGetDatum(relid);
1194 :
1195 : /*
1196 : * We don't consider row filters or column lists for FOR ALL TABLES or
1197 : * FOR TABLES IN SCHEMA publications.
1198 : */
1199 3596 : if (!pub->alltables &&
1200 2282 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1201 : ObjectIdGetDatum(schemaid),
1202 : ObjectIdGetDatum(pub->oid)))
1203 2188 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1204 : ObjectIdGetDatum(relid),
1205 : ObjectIdGetDatum(pub->oid));
1206 :
1207 3596 : if (HeapTupleIsValid(pubtuple))
1208 : {
1209 : /* Lookup the column list attribute. */
1210 1970 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1211 : Anum_pg_publication_rel_prattrs,
1212 : &(nulls[2]));
1213 :
1214 : /* Null indicates no filter. */
1215 1970 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1216 : Anum_pg_publication_rel_prqual,
1217 : &(nulls[3]));
1218 : }
1219 : else
1220 : {
1221 1626 : nulls[2] = true;
1222 1626 : nulls[3] = true;
1223 : }
1224 :
1225 : /* Show all columns when the column list is not specified. */
1226 3596 : if (nulls[2])
1227 : {
1228 3360 : Relation rel = table_open(relid, AccessShareLock);
1229 3360 : int nattnums = 0;
1230 : int16 *attnums;
1231 3360 : TupleDesc desc = RelationGetDescr(rel);
1232 : int i;
1233 :
1234 3360 : attnums = (int16 *) palloc(desc->natts * sizeof(int16));
1235 :
1236 9120 : for (i = 0; i < desc->natts; i++)
1237 : {
1238 5760 : Form_pg_attribute att = TupleDescAttr(desc, i);
1239 :
1240 5760 : if (att->attisdropped || att->attgenerated)
1241 34 : continue;
1242 :
1243 5726 : attnums[nattnums++] = att->attnum;
1244 : }
1245 :
1246 3360 : if (nattnums > 0)
1247 : {
1248 3360 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1249 3360 : nulls[2] = false;
1250 : }
1251 :
1252 3360 : table_close(rel, AccessShareLock);
1253 : }
1254 :
1255 3596 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1256 :
1257 3596 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1258 : }
1259 :
1260 1628 : SRF_RETURN_DONE(funcctx);
1261 : }
|