Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_publication.c
4 : * publication C API manipulation
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_publication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "catalog/catalog.h"
22 : #include "catalog/dependency.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/partition.h"
27 : #include "catalog/pg_inherits.h"
28 : #include "catalog/pg_namespace.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/publicationcmds.h"
34 : #include "funcapi.h"
35 : #include "utils/array.h"
36 : #include "utils/builtins.h"
37 : #include "utils/catcache.h"
38 : #include "utils/fmgroids.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/syscache.h"
42 :
43 : /* Records association between publication and published table */
44 : typedef struct
45 : {
46 : Oid relid; /* OID of published table */
47 : Oid pubid; /* OID of publication that publishes this
48 : * table. */
49 : } published_rel;
50 :
51 : static void publication_translate_columns(Relation targetrel, List *columns,
52 : int *natts, AttrNumber **attrs);
53 :
54 : /*
55 : * Check if relation can be in given publication and throws appropriate
56 : * error if not.
57 : */
58 : static void
59 986 : check_publication_add_relation(Relation targetrel)
60 : {
61 : /* Must be a regular or partitioned table */
62 986 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
63 142 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
64 14 : ereport(ERROR,
65 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 : errmsg("cannot add relation \"%s\" to publication",
67 : RelationGetRelationName(targetrel)),
68 : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
69 :
70 : /* Can't be system table */
71 972 : if (IsCatalogRelation(targetrel))
72 6 : ereport(ERROR,
73 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 : errmsg("cannot add relation \"%s\" to publication",
75 : RelationGetRelationName(targetrel)),
76 : errdetail("This operation is not supported for system tables.")));
77 :
78 : /* UNLOGGED and TEMP relations cannot be part of publication. */
79 966 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
80 6 : ereport(ERROR,
81 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
82 : errmsg("cannot add relation \"%s\" to publication",
83 : RelationGetRelationName(targetrel)),
84 : errdetail("This operation is not supported for temporary tables.")));
85 960 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
86 6 : ereport(ERROR,
87 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
88 : errmsg("cannot add relation \"%s\" to publication",
89 : RelationGetRelationName(targetrel)),
90 : errdetail("This operation is not supported for unlogged tables.")));
91 954 : }
92 :
93 : /*
94 : * Check if schema can be in given publication and throw appropriate error if
95 : * not.
96 : */
97 : static void
98 204 : check_publication_add_schema(Oid schemaid)
99 : {
100 : /* Can't be system namespace */
101 204 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
102 6 : ereport(ERROR,
103 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
104 : errmsg("cannot add schema \"%s\" to publication",
105 : get_namespace_name(schemaid)),
106 : errdetail("This operation is not supported for system schemas.")));
107 :
108 : /* Can't be temporary namespace */
109 198 : if (isAnyTempNamespace(schemaid))
110 0 : ereport(ERROR,
111 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
112 : errmsg("cannot add schema \"%s\" to publication",
113 : get_namespace_name(schemaid)),
114 : errdetail("Temporary schemas cannot be replicated.")));
115 198 : }
116 :
117 : /*
118 : * Returns if relation represented by oid and Form_pg_class entry
119 : * is publishable.
120 : *
121 : * Does same checks as check_publication_add_relation() above, but does not
122 : * need relation to be opened and also does not throw errors.
123 : *
124 : * XXX This also excludes all tables with relid < FirstNormalObjectId,
125 : * ie all tables created during initdb. This mainly affects the preinstalled
126 : * information_schema. IsCatalogRelationOid() only excludes tables with
127 : * relid < FirstUnpinnedObjectId, making that test rather redundant,
128 : * but really we should get rid of the FirstNormalObjectId test not
129 : * IsCatalogRelationOid. We can't do so today because we don't want
130 : * information_schema tables to be considered publishable; but this test
131 : * is really inadequate for that, since the information_schema could be
132 : * dropped and reloaded and then it'll be considered publishable. The best
133 : * long-term solution may be to add a "relispublishable" bool to pg_class,
134 : * and depend on that instead of OID checks.
135 : */
136 : static bool
137 591384 : is_publishable_class(Oid relid, Form_pg_class reltuple)
138 : {
139 604122 : return (reltuple->relkind == RELKIND_RELATION ||
140 12738 : reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
141 579912 : !IsCatalogRelationOid(relid) &&
142 1182768 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
143 : relid >= FirstNormalObjectId;
144 : }
145 :
146 : /*
147 : * Another variant of is_publishable_class(), taking a Relation.
148 : */
149 : bool
150 538252 : is_publishable_relation(Relation rel)
151 : {
152 538252 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
153 : }
154 :
155 : /*
156 : * SQL-callable variant of the above
157 : *
158 : * This returns null when the relation does not exist. This is intended to be
159 : * used for example in psql to avoid gratuitous errors when there are
160 : * concurrent catalog changes.
161 : */
162 : Datum
163 5316 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
164 : {
165 5316 : Oid relid = PG_GETARG_OID(0);
166 : HeapTuple tuple;
167 : bool result;
168 :
169 5316 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
170 5316 : if (!HeapTupleIsValid(tuple))
171 0 : PG_RETURN_NULL();
172 5316 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
173 5316 : ReleaseSysCache(tuple);
174 5316 : PG_RETURN_BOOL(result);
175 : }
176 :
177 : /*
178 : * Returns true if the ancestor is in the list of published relations.
179 : * Otherwise, returns false.
180 : */
181 : static bool
182 198 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
183 : {
184 : ListCell *lc;
185 :
186 678 : foreach(lc, table_infos)
187 : {
188 560 : Oid relid = ((published_rel *) lfirst(lc))->relid;
189 :
190 560 : if (relid == ancestor)
191 80 : return true;
192 : }
193 :
194 118 : return false;
195 : }
196 :
197 : /*
198 : * Filter out the partitions whose parent tables are also present in the list.
199 : */
200 : static void
201 336 : filter_partitions(List *table_infos)
202 : {
203 : ListCell *lc;
204 :
205 998 : foreach(lc, table_infos)
206 : {
207 662 : bool skip = false;
208 662 : List *ancestors = NIL;
209 : ListCell *lc2;
210 662 : published_rel *table_info = (published_rel *) lfirst(lc);
211 :
212 662 : if (get_rel_relispartition(table_info->relid))
213 198 : ancestors = get_partition_ancestors(table_info->relid);
214 :
215 780 : foreach(lc2, ancestors)
216 : {
217 198 : Oid ancestor = lfirst_oid(lc2);
218 :
219 198 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
220 : {
221 80 : skip = true;
222 80 : break;
223 : }
224 : }
225 :
226 662 : if (skip)
227 80 : table_infos = foreach_delete_current(table_infos, lc);
228 : }
229 336 : }
230 :
231 : /*
232 : * Returns true if any schema is associated with the publication, false if no
233 : * schema is associated with the publication.
234 : */
235 : bool
236 256 : is_schema_publication(Oid pubid)
237 : {
238 : Relation pubschsrel;
239 : ScanKeyData scankey;
240 : SysScanDesc scan;
241 : HeapTuple tup;
242 256 : bool result = false;
243 :
244 256 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
245 256 : ScanKeyInit(&scankey,
246 : Anum_pg_publication_namespace_pnpubid,
247 : BTEqualStrategyNumber, F_OIDEQ,
248 : ObjectIdGetDatum(pubid));
249 :
250 256 : scan = systable_beginscan(pubschsrel,
251 : PublicationNamespacePnnspidPnpubidIndexId,
252 : true, NULL, 1, &scankey);
253 256 : tup = systable_getnext(scan);
254 256 : result = HeapTupleIsValid(tup);
255 :
256 256 : systable_endscan(scan);
257 256 : table_close(pubschsrel, AccessShareLock);
258 :
259 256 : return result;
260 : }
261 :
262 : /*
263 : * Gets the relations based on the publication partition option for a specified
264 : * relation.
265 : */
266 : List *
267 5204 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
268 : Oid relid)
269 : {
270 5204 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
271 : pub_partopt != PUBLICATION_PART_ROOT)
272 1032 : {
273 1032 : List *all_parts = find_all_inheritors(relid, NoLock,
274 : NULL);
275 :
276 1032 : if (pub_partopt == PUBLICATION_PART_ALL)
277 838 : result = list_concat(result, all_parts);
278 194 : else if (pub_partopt == PUBLICATION_PART_LEAF)
279 : {
280 : ListCell *lc;
281 :
282 710 : foreach(lc, all_parts)
283 : {
284 516 : Oid partOid = lfirst_oid(lc);
285 :
286 516 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
287 322 : result = lappend_oid(result, partOid);
288 : }
289 : }
290 : else
291 : Assert(false);
292 : }
293 : else
294 4172 : result = lappend_oid(result, relid);
295 :
296 5204 : return result;
297 : }
298 :
299 : /*
300 : * Returns the relid of the topmost ancestor that is published via this
301 : * publication if any and set its ancestor level to ancestor_level,
302 : * otherwise returns InvalidOid.
303 : *
304 : * The ancestor_level value allows us to compare the results for multiple
305 : * publications, and decide which value is higher up.
306 : *
307 : * Note that the list of ancestors should be ordered such that the topmost
308 : * ancestor is at the end of the list.
309 : */
310 : Oid
311 432 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
312 : {
313 : ListCell *lc;
314 432 : Oid topmost_relid = InvalidOid;
315 432 : int level = 0;
316 :
317 : /*
318 : * Find the "topmost" ancestor that is in this publication.
319 : */
320 878 : foreach(lc, ancestors)
321 : {
322 446 : Oid ancestor = lfirst_oid(lc);
323 446 : List *apubids = GetRelationPublications(ancestor);
324 446 : List *aschemaPubids = NIL;
325 :
326 446 : level++;
327 :
328 446 : if (list_member_oid(apubids, puboid))
329 : {
330 292 : topmost_relid = ancestor;
331 :
332 292 : if (ancestor_level)
333 70 : *ancestor_level = level;
334 : }
335 : else
336 : {
337 154 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
338 154 : if (list_member_oid(aschemaPubids, puboid))
339 : {
340 10 : topmost_relid = ancestor;
341 :
342 10 : if (ancestor_level)
343 10 : *ancestor_level = level;
344 : }
345 : }
346 :
347 446 : list_free(apubids);
348 446 : list_free(aschemaPubids);
349 : }
350 :
351 432 : return topmost_relid;
352 : }
353 :
354 : /*
355 : * Insert new publication / relation mapping.
356 : */
357 : ObjectAddress
358 1008 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
359 : bool if_not_exists)
360 : {
361 : Relation rel;
362 : HeapTuple tup;
363 : Datum values[Natts_pg_publication_rel];
364 : bool nulls[Natts_pg_publication_rel];
365 1008 : Relation targetrel = pri->relation;
366 1008 : Oid relid = RelationGetRelid(targetrel);
367 : Oid pubreloid;
368 1008 : Publication *pub = GetPublication(pubid);
369 1008 : AttrNumber *attarray = NULL;
370 1008 : int natts = 0;
371 : ObjectAddress myself,
372 : referenced;
373 1008 : List *relids = NIL;
374 :
375 1008 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
376 :
377 : /*
378 : * Check for duplicates. Note that this does not really prevent
379 : * duplicates, it's here just to provide nicer error message in common
380 : * case. The real protection is the unique key on the catalog.
381 : */
382 1008 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
383 : ObjectIdGetDatum(pubid)))
384 : {
385 22 : table_close(rel, RowExclusiveLock);
386 :
387 22 : if (if_not_exists)
388 16 : return InvalidObjectAddress;
389 :
390 6 : ereport(ERROR,
391 : (errcode(ERRCODE_DUPLICATE_OBJECT),
392 : errmsg("relation \"%s\" is already member of publication \"%s\"",
393 : RelationGetRelationName(targetrel), pub->name)));
394 : }
395 :
396 986 : check_publication_add_relation(targetrel);
397 :
398 : /*
399 : * Translate column names to attnums and make sure the column list
400 : * contains only allowed elements (no system or generated columns etc.).
401 : * Also build an array of attnums, for storing in the catalog.
402 : */
403 954 : publication_translate_columns(pri->relation, pri->columns,
404 : &natts, &attarray);
405 :
406 : /* Form a tuple. */
407 936 : memset(values, 0, sizeof(values));
408 936 : memset(nulls, false, sizeof(nulls));
409 :
410 936 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
411 : Anum_pg_publication_rel_oid);
412 936 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
413 936 : values[Anum_pg_publication_rel_prpubid - 1] =
414 936 : ObjectIdGetDatum(pubid);
415 936 : values[Anum_pg_publication_rel_prrelid - 1] =
416 936 : ObjectIdGetDatum(relid);
417 :
418 : /* Add qualifications, if available */
419 936 : if (pri->whereClause != NULL)
420 298 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
421 : else
422 638 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
423 :
424 : /* Add column list, if available */
425 936 : if (pri->columns)
426 272 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts));
427 : else
428 664 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
429 :
430 936 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
431 :
432 : /* Insert tuple into catalog. */
433 936 : CatalogTupleInsert(rel, tup);
434 936 : heap_freetuple(tup);
435 :
436 : /* Register dependencies as needed */
437 936 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
438 :
439 : /* Add dependency on the publication */
440 936 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
441 936 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
442 :
443 : /* Add dependency on the relation */
444 936 : ObjectAddressSet(referenced, RelationRelationId, relid);
445 936 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
446 :
447 : /* Add dependency on the objects mentioned in the qualifications */
448 936 : if (pri->whereClause)
449 298 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
450 : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
451 : false);
452 :
453 : /* Add dependency on the columns, if any are listed */
454 1412 : for (int i = 0; i < natts; i++)
455 : {
456 476 : ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
457 476 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
458 : }
459 :
460 : /* Close the table. */
461 936 : table_close(rel, RowExclusiveLock);
462 :
463 : /*
464 : * Invalidate relcache so that publication info is rebuilt.
465 : *
466 : * For the partitioned tables, we must invalidate all partitions contained
467 : * in the respective partition hierarchies, not just the one explicitly
468 : * mentioned in the publication. This is required because we implicitly
469 : * publish the child tables when the parent table is published.
470 : */
471 936 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
472 : relid);
473 :
474 936 : InvalidatePublicationRels(relids);
475 :
476 936 : return myself;
477 : }
478 :
479 : /* qsort comparator for attnums */
480 : static int
481 204 : compare_int16(const void *a, const void *b)
482 : {
483 204 : int av = *(const int16 *) a;
484 204 : int bv = *(const int16 *) b;
485 :
486 : /* this can't overflow if int is wider than int16 */
487 204 : return (av - bv);
488 : }
489 :
490 : /*
491 : * Translate a list of column names to an array of attribute numbers
492 : * and a Bitmapset with them; verify that each attribute is appropriate
493 : * to have in a publication column list (no system or generated attributes,
494 : * no duplicates). Additional checks with replica identity are done later;
495 : * see pub_collist_contains_invalid_column.
496 : *
497 : * Note that the attribute numbers are *not* offset by
498 : * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
499 : * is okay.
500 : */
501 : static void
502 954 : publication_translate_columns(Relation targetrel, List *columns,
503 : int *natts, AttrNumber **attrs)
504 : {
505 954 : AttrNumber *attarray = NULL;
506 954 : Bitmapset *set = NULL;
507 : ListCell *lc;
508 954 : int n = 0;
509 954 : TupleDesc tupdesc = RelationGetDescr(targetrel);
510 :
511 : /* Bail out when no column list defined. */
512 954 : if (!columns)
513 664 : return;
514 :
515 : /*
516 : * Translate list of columns to attnums. We prohibit system attributes and
517 : * make sure there are no duplicate columns.
518 : */
519 290 : attarray = palloc(sizeof(AttrNumber) * list_length(columns));
520 784 : foreach(lc, columns)
521 : {
522 512 : char *colname = strVal(lfirst(lc));
523 512 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
524 :
525 512 : if (attnum == InvalidAttrNumber)
526 6 : ereport(ERROR,
527 : errcode(ERRCODE_UNDEFINED_COLUMN),
528 : errmsg("column \"%s\" of relation \"%s\" does not exist",
529 : colname, RelationGetRelationName(targetrel)));
530 :
531 506 : if (!AttrNumberIsForUserDefinedAttr(attnum))
532 6 : ereport(ERROR,
533 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
534 : errmsg("cannot use system column \"%s\" in publication column list",
535 : colname));
536 :
537 500 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
538 6 : ereport(ERROR,
539 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
540 : errmsg("cannot use generated column \"%s\" in publication column list",
541 : colname));
542 :
543 494 : if (bms_is_member(attnum, set))
544 0 : ereport(ERROR,
545 : errcode(ERRCODE_DUPLICATE_OBJECT),
546 : errmsg("duplicate column \"%s\" in publication column list",
547 : colname));
548 :
549 494 : set = bms_add_member(set, attnum);
550 494 : attarray[n++] = attnum;
551 : }
552 :
553 : /* Be tidy, so that the catalog representation is always sorted */
554 272 : qsort(attarray, n, sizeof(AttrNumber), compare_int16);
555 :
556 272 : *natts = n;
557 272 : *attrs = attarray;
558 :
559 272 : bms_free(set);
560 : }
561 :
562 : /*
563 : * Transform a column list (represented by an array Datum) to a bitmapset.
564 : *
565 : * If columns isn't NULL, add the column numbers to that set.
566 : *
567 : * If mcxt isn't NULL, build the bitmapset in that context.
568 : */
569 : Bitmapset *
570 428 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
571 : {
572 428 : Bitmapset *result = NULL;
573 : ArrayType *arr;
574 : int nelems;
575 : int16 *elems;
576 428 : MemoryContext oldcxt = NULL;
577 :
578 : /*
579 : * If an existing bitmap was provided, use it. Otherwise just use NULL and
580 : * build a new bitmap.
581 : */
582 428 : if (columns)
583 0 : result = columns;
584 :
585 428 : arr = DatumGetArrayTypeP(pubcols);
586 428 : nelems = ARR_DIMS(arr)[0];
587 428 : elems = (int16 *) ARR_DATA_PTR(arr);
588 :
589 : /* If a memory context was specified, switch to it. */
590 428 : if (mcxt)
591 74 : oldcxt = MemoryContextSwitchTo(mcxt);
592 :
593 1188 : for (int i = 0; i < nelems; i++)
594 760 : result = bms_add_member(result, elems[i]);
595 :
596 428 : if (mcxt)
597 74 : MemoryContextSwitchTo(oldcxt);
598 :
599 428 : return result;
600 : }
601 :
602 : /*
603 : * Insert new publication / schema mapping.
604 : */
605 : ObjectAddress
606 222 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
607 : {
608 : Relation rel;
609 : HeapTuple tup;
610 : Datum values[Natts_pg_publication_namespace];
611 : bool nulls[Natts_pg_publication_namespace];
612 : Oid psschid;
613 222 : Publication *pub = GetPublication(pubid);
614 222 : List *schemaRels = NIL;
615 : ObjectAddress myself,
616 : referenced;
617 :
618 222 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
619 :
620 : /*
621 : * Check for duplicates. Note that this does not really prevent
622 : * duplicates, it's here just to provide nicer error message in common
623 : * case. The real protection is the unique key on the catalog.
624 : */
625 222 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
626 : ObjectIdGetDatum(schemaid),
627 : ObjectIdGetDatum(pubid)))
628 : {
629 18 : table_close(rel, RowExclusiveLock);
630 :
631 18 : if (if_not_exists)
632 12 : return InvalidObjectAddress;
633 :
634 6 : ereport(ERROR,
635 : (errcode(ERRCODE_DUPLICATE_OBJECT),
636 : errmsg("schema \"%s\" is already member of publication \"%s\"",
637 : get_namespace_name(schemaid), pub->name)));
638 : }
639 :
640 204 : check_publication_add_schema(schemaid);
641 :
642 : /* Form a tuple */
643 198 : memset(values, 0, sizeof(values));
644 198 : memset(nulls, false, sizeof(nulls));
645 :
646 198 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
647 : Anum_pg_publication_namespace_oid);
648 198 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
649 198 : values[Anum_pg_publication_namespace_pnpubid - 1] =
650 198 : ObjectIdGetDatum(pubid);
651 198 : values[Anum_pg_publication_namespace_pnnspid - 1] =
652 198 : ObjectIdGetDatum(schemaid);
653 :
654 198 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
655 :
656 : /* Insert tuple into catalog */
657 198 : CatalogTupleInsert(rel, tup);
658 198 : heap_freetuple(tup);
659 :
660 198 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
661 :
662 : /* Add dependency on the publication */
663 198 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
664 198 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
665 :
666 : /* Add dependency on the schema */
667 198 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
668 198 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
669 :
670 : /* Close the table */
671 198 : table_close(rel, RowExclusiveLock);
672 :
673 : /*
674 : * Invalidate relcache so that publication info is rebuilt. See
675 : * publication_add_relation for why we need to consider all the
676 : * partitions.
677 : */
678 198 : schemaRels = GetSchemaPublicationRelations(schemaid,
679 : PUBLICATION_PART_ALL);
680 198 : InvalidatePublicationRels(schemaRels);
681 :
682 198 : return myself;
683 : }
684 :
685 : /* Gets list of publication oids for a relation */
686 : List *
687 11098 : GetRelationPublications(Oid relid)
688 : {
689 11098 : List *result = NIL;
690 : CatCList *pubrellist;
691 : int i;
692 :
693 : /* Find all publications associated with the relation. */
694 11098 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
695 : ObjectIdGetDatum(relid));
696 12522 : for (i = 0; i < pubrellist->n_members; i++)
697 : {
698 1424 : HeapTuple tup = &pubrellist->members[i]->tuple;
699 1424 : Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
700 :
701 1424 : result = lappend_oid(result, pubid);
702 : }
703 :
704 11098 : ReleaseSysCacheList(pubrellist);
705 :
706 11098 : return result;
707 : }
708 :
709 : /*
710 : * Gets list of relation oids for a publication.
711 : *
712 : * This should only be used FOR TABLE publications, the FOR ALL TABLES
713 : * should use GetAllTablesPublicationRelations().
714 : */
715 : List *
716 2074 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
717 : {
718 : List *result;
719 : Relation pubrelsrel;
720 : ScanKeyData scankey;
721 : SysScanDesc scan;
722 : HeapTuple tup;
723 :
724 : /* Find all publications associated with the relation. */
725 2074 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
726 :
727 2074 : ScanKeyInit(&scankey,
728 : Anum_pg_publication_rel_prpubid,
729 : BTEqualStrategyNumber, F_OIDEQ,
730 : ObjectIdGetDatum(pubid));
731 :
732 2074 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
733 : true, NULL, 1, &scankey);
734 :
735 2074 : result = NIL;
736 4908 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
737 : {
738 : Form_pg_publication_rel pubrel;
739 :
740 2834 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
741 2834 : result = GetPubPartitionOptionRelations(result, pub_partopt,
742 : pubrel->prrelid);
743 : }
744 :
745 2074 : systable_endscan(scan);
746 2074 : table_close(pubrelsrel, AccessShareLock);
747 :
748 : /* Now sort and de-duplicate the result list */
749 2074 : list_sort(result, list_oid_cmp);
750 2074 : list_deduplicate_oid(result);
751 :
752 2074 : return result;
753 : }
754 :
755 : /*
756 : * Gets list of publication oids for publications marked as FOR ALL TABLES.
757 : */
758 : List *
759 7698 : GetAllTablesPublications(void)
760 : {
761 : List *result;
762 : Relation rel;
763 : ScanKeyData scankey;
764 : SysScanDesc scan;
765 : HeapTuple tup;
766 :
767 : /* Find all publications that are marked as for all tables. */
768 7698 : rel = table_open(PublicationRelationId, AccessShareLock);
769 :
770 7698 : ScanKeyInit(&scankey,
771 : Anum_pg_publication_puballtables,
772 : BTEqualStrategyNumber, F_BOOLEQ,
773 : BoolGetDatum(true));
774 :
775 7698 : scan = systable_beginscan(rel, InvalidOid, false,
776 : NULL, 1, &scankey);
777 :
778 7698 : result = NIL;
779 7844 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
780 : {
781 146 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
782 :
783 146 : result = lappend_oid(result, oid);
784 : }
785 :
786 7698 : systable_endscan(scan);
787 7698 : table_close(rel, AccessShareLock);
788 :
789 7698 : return result;
790 : }
791 :
792 : /*
793 : * Gets list of all relation published by FOR ALL TABLES publication(s).
794 : *
795 : * If the publication publishes partition changes via their respective root
796 : * partitioned tables, we must exclude partitions in favor of including the
797 : * root partitioned tables.
798 : */
799 : List *
800 284 : GetAllTablesPublicationRelations(bool pubviaroot)
801 : {
802 : Relation classRel;
803 : ScanKeyData key[1];
804 : TableScanDesc scan;
805 : HeapTuple tuple;
806 284 : List *result = NIL;
807 :
808 284 : classRel = table_open(RelationRelationId, AccessShareLock);
809 :
810 284 : ScanKeyInit(&key[0],
811 : Anum_pg_class_relkind,
812 : BTEqualStrategyNumber, F_CHAREQ,
813 : CharGetDatum(RELKIND_RELATION));
814 :
815 284 : scan = table_beginscan_catalog(classRel, 1, key);
816 :
817 21026 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
818 : {
819 20742 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
820 20742 : Oid relid = relForm->oid;
821 :
822 20742 : if (is_publishable_class(relid, relForm) &&
823 1418 : !(relForm->relispartition && pubviaroot))
824 1236 : result = lappend_oid(result, relid);
825 : }
826 :
827 284 : table_endscan(scan);
828 :
829 284 : if (pubviaroot)
830 : {
831 26 : ScanKeyInit(&key[0],
832 : Anum_pg_class_relkind,
833 : BTEqualStrategyNumber, F_CHAREQ,
834 : CharGetDatum(RELKIND_PARTITIONED_TABLE));
835 :
836 26 : scan = table_beginscan_catalog(classRel, 1, key);
837 :
838 156 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
839 : {
840 130 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
841 130 : Oid relid = relForm->oid;
842 :
843 130 : if (is_publishable_class(relid, relForm) &&
844 130 : !relForm->relispartition)
845 104 : result = lappend_oid(result, relid);
846 : }
847 :
848 26 : table_endscan(scan);
849 : }
850 :
851 284 : table_close(classRel, AccessShareLock);
852 284 : return result;
853 : }
854 :
855 : /*
856 : * Gets the list of schema oids for a publication.
857 : *
858 : * This should only be used FOR TABLES IN SCHEMA publications.
859 : */
860 : List *
861 2004 : GetPublicationSchemas(Oid pubid)
862 : {
863 2004 : List *result = NIL;
864 : Relation pubschsrel;
865 : ScanKeyData scankey;
866 : SysScanDesc scan;
867 : HeapTuple tup;
868 :
869 : /* Find all schemas associated with the publication */
870 2004 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
871 :
872 2004 : ScanKeyInit(&scankey,
873 : Anum_pg_publication_namespace_pnpubid,
874 : BTEqualStrategyNumber, F_OIDEQ,
875 : ObjectIdGetDatum(pubid));
876 :
877 2004 : scan = systable_beginscan(pubschsrel,
878 : PublicationNamespacePnnspidPnpubidIndexId,
879 : true, NULL, 1, &scankey);
880 2104 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
881 : {
882 : Form_pg_publication_namespace pubsch;
883 :
884 100 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
885 :
886 100 : result = lappend_oid(result, pubsch->pnnspid);
887 : }
888 :
889 2004 : systable_endscan(scan);
890 2004 : table_close(pubschsrel, AccessShareLock);
891 :
892 2004 : return result;
893 : }
894 :
895 : /*
896 : * Gets the list of publication oids associated with a specified schema.
897 : */
898 : List *
899 10762 : GetSchemaPublications(Oid schemaid)
900 : {
901 10762 : List *result = NIL;
902 : CatCList *pubschlist;
903 : int i;
904 :
905 : /* Find all publications associated with the schema */
906 10762 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
907 : ObjectIdGetDatum(schemaid));
908 10876 : for (i = 0; i < pubschlist->n_members; i++)
909 : {
910 114 : HeapTuple tup = &pubschlist->members[i]->tuple;
911 114 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
912 :
913 114 : result = lappend_oid(result, pubid);
914 : }
915 :
916 10762 : ReleaseSysCacheList(pubschlist);
917 :
918 10762 : return result;
919 : }
920 :
921 : /*
922 : * Get the list of publishable relation oids for a specified schema.
923 : */
924 : List *
925 460 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
926 : {
927 : Relation classRel;
928 : ScanKeyData key[1];
929 : TableScanDesc scan;
930 : HeapTuple tuple;
931 460 : List *result = NIL;
932 :
933 : Assert(OidIsValid(schemaid));
934 :
935 460 : classRel = table_open(RelationRelationId, AccessShareLock);
936 :
937 460 : ScanKeyInit(&key[0],
938 : Anum_pg_class_relnamespace,
939 : BTEqualStrategyNumber, F_OIDEQ,
940 : schemaid);
941 :
942 : /* get all the relations present in the specified schema */
943 460 : scan = table_beginscan_catalog(classRel, 1, key);
944 27404 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
945 : {
946 26944 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
947 26944 : Oid relid = relForm->oid;
948 : char relkind;
949 :
950 26944 : if (!is_publishable_class(relid, relForm))
951 11108 : continue;
952 :
953 15836 : relkind = get_rel_relkind(relid);
954 15836 : if (relkind == RELKIND_RELATION)
955 15198 : result = lappend_oid(result, relid);
956 638 : else if (relkind == RELKIND_PARTITIONED_TABLE)
957 : {
958 638 : List *partitionrels = NIL;
959 :
960 : /*
961 : * It is quite possible that some of the partitions are in a
962 : * different schema than the parent table, so we need to get such
963 : * partitions separately.
964 : */
965 638 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
966 : pub_partopt,
967 : relForm->oid);
968 638 : result = list_concat_unique_oid(result, partitionrels);
969 : }
970 : }
971 :
972 460 : table_endscan(scan);
973 460 : table_close(classRel, AccessShareLock);
974 460 : return result;
975 : }
976 :
977 : /*
978 : * Gets the list of all relations published by FOR TABLES IN SCHEMA
979 : * publication.
980 : */
981 : List *
982 1612 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
983 : {
984 1612 : List *result = NIL;
985 1612 : List *pubschemalist = GetPublicationSchemas(pubid);
986 : ListCell *cell;
987 :
988 1682 : foreach(cell, pubschemalist)
989 : {
990 70 : Oid schemaid = lfirst_oid(cell);
991 70 : List *schemaRels = NIL;
992 :
993 70 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
994 70 : result = list_concat(result, schemaRels);
995 : }
996 :
997 1612 : return result;
998 : }
999 :
1000 : /*
1001 : * Get publication using oid
1002 : *
1003 : * The Publication struct and its data are palloc'ed here.
1004 : */
1005 : Publication *
1006 7190 : GetPublication(Oid pubid)
1007 : {
1008 : HeapTuple tup;
1009 : Publication *pub;
1010 : Form_pg_publication pubform;
1011 :
1012 7190 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1013 7190 : if (!HeapTupleIsValid(tup))
1014 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1015 :
1016 7190 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1017 :
1018 7190 : pub = (Publication *) palloc(sizeof(Publication));
1019 7190 : pub->oid = pubid;
1020 7190 : pub->name = pstrdup(NameStr(pubform->pubname));
1021 7190 : pub->alltables = pubform->puballtables;
1022 7190 : pub->pubactions.pubinsert = pubform->pubinsert;
1023 7190 : pub->pubactions.pubupdate = pubform->pubupdate;
1024 7190 : pub->pubactions.pubdelete = pubform->pubdelete;
1025 7190 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1026 7190 : pub->pubviaroot = pubform->pubviaroot;
1027 :
1028 7190 : ReleaseSysCache(tup);
1029 :
1030 7190 : return pub;
1031 : }
1032 :
1033 : /*
1034 : * Get Publication using name.
1035 : */
1036 : Publication *
1037 2208 : GetPublicationByName(const char *pubname, bool missing_ok)
1038 : {
1039 : Oid oid;
1040 :
1041 2208 : oid = get_publication_oid(pubname, missing_ok);
1042 :
1043 2204 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1044 : }
1045 :
1046 : /*
1047 : * Get information of the tables in the given publication array.
1048 : *
1049 : * Returns pubid, relid, column list, row filter for each table.
1050 : */
1051 : Datum
1052 5476 : pg_get_publication_tables(PG_FUNCTION_ARGS)
1053 : {
1054 : #define NUM_PUBLICATION_TABLES_ELEM 4
1055 : FuncCallContext *funcctx;
1056 5476 : List *table_infos = NIL;
1057 :
1058 : /* stuff done only on the first call of the function */
1059 5476 : if (SRF_IS_FIRSTCALL())
1060 : {
1061 : TupleDesc tupdesc;
1062 : MemoryContext oldcontext;
1063 : ArrayType *arr;
1064 : Datum *elems;
1065 : int nelems,
1066 : i;
1067 1720 : bool viaroot = false;
1068 :
1069 : /* create a function context for cross-call persistence */
1070 1720 : funcctx = SRF_FIRSTCALL_INIT();
1071 :
1072 : /* switch to memory context appropriate for multiple function calls */
1073 1720 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1074 :
1075 : /*
1076 : * Deconstruct the parameter into elements where each element is a
1077 : * publication name.
1078 : */
1079 1720 : arr = PG_GETARG_ARRAYTYPE_P(0);
1080 1720 : deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
1081 : &elems, NULL, &nelems);
1082 :
1083 : /* Get Oids of tables from each publication. */
1084 3526 : for (i = 0; i < nelems; i++)
1085 : {
1086 : Publication *pub_elem;
1087 1806 : List *pub_elem_tables = NIL;
1088 : ListCell *lc;
1089 :
1090 1806 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1091 :
1092 : /*
1093 : * Publications support partitioned tables. If
1094 : * publish_via_partition_root is false, all changes are replicated
1095 : * using leaf partition identity and schema, so we only need
1096 : * those. Otherwise, get the partitioned table itself.
1097 : */
1098 1806 : if (pub_elem->alltables)
1099 284 : pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
1100 : else
1101 : {
1102 : List *relids,
1103 : *schemarelids;
1104 :
1105 1522 : relids = GetPublicationRelations(pub_elem->oid,
1106 1522 : pub_elem->pubviaroot ?
1107 1522 : PUBLICATION_PART_ROOT :
1108 : PUBLICATION_PART_LEAF);
1109 1522 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1110 1522 : pub_elem->pubviaroot ?
1111 1522 : PUBLICATION_PART_ROOT :
1112 : PUBLICATION_PART_LEAF);
1113 1522 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1114 : }
1115 :
1116 : /*
1117 : * Record the published table and the corresponding publication so
1118 : * that we can get row filters and column lists later.
1119 : *
1120 : * When a table is published by multiple publications, to obtain
1121 : * all row filters and column lists, the structure related to this
1122 : * table will be recorded multiple times.
1123 : */
1124 5642 : foreach(lc, pub_elem_tables)
1125 : {
1126 3836 : published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
1127 :
1128 3836 : table_info->relid = lfirst_oid(lc);
1129 3836 : table_info->pubid = pub_elem->oid;
1130 3836 : table_infos = lappend(table_infos, table_info);
1131 : }
1132 :
1133 : /* At least one publication is using publish_via_partition_root. */
1134 1806 : if (pub_elem->pubviaroot)
1135 352 : viaroot = true;
1136 : }
1137 :
1138 : /*
1139 : * If the publication publishes partition changes via their respective
1140 : * root partitioned tables, we must exclude partitions in favor of
1141 : * including the root partitioned tables. Otherwise, the function
1142 : * could return both the child and parent tables which could cause
1143 : * data of the child table to be double-published on the subscriber
1144 : * side.
1145 : */
1146 1720 : if (viaroot)
1147 336 : filter_partitions(table_infos);
1148 :
1149 : /* Construct a tuple descriptor for the result rows. */
1150 1720 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1151 1720 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1152 : OIDOID, -1, 0);
1153 1720 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1154 : OIDOID, -1, 0);
1155 1720 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1156 : INT2VECTOROID, -1, 0);
1157 1720 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1158 : PG_NODE_TREEOID, -1, 0);
1159 :
1160 1720 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1161 1720 : funcctx->user_fctx = (void *) table_infos;
1162 :
1163 1720 : MemoryContextSwitchTo(oldcontext);
1164 : }
1165 :
1166 : /* stuff done on every call of the function */
1167 5476 : funcctx = SRF_PERCALL_SETUP();
1168 5476 : table_infos = (List *) funcctx->user_fctx;
1169 :
1170 5476 : if (funcctx->call_cntr < list_length(table_infos))
1171 : {
1172 3756 : HeapTuple pubtuple = NULL;
1173 : HeapTuple rettuple;
1174 : Publication *pub;
1175 3756 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1176 3756 : Oid relid = table_info->relid;
1177 3756 : Oid schemaid = get_rel_namespace(relid);
1178 3756 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1179 3756 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1180 :
1181 : /*
1182 : * Form tuple with appropriate data.
1183 : */
1184 :
1185 3756 : pub = GetPublication(table_info->pubid);
1186 :
1187 3756 : values[0] = ObjectIdGetDatum(pub->oid);
1188 3756 : values[1] = ObjectIdGetDatum(relid);
1189 :
1190 : /*
1191 : * We don't consider row filters or column lists for FOR ALL TABLES or
1192 : * FOR TABLES IN SCHEMA publications.
1193 : */
1194 3756 : if (!pub->alltables &&
1195 2416 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1196 : ObjectIdGetDatum(schemaid),
1197 : ObjectIdGetDatum(pub->oid)))
1198 2322 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1199 : ObjectIdGetDatum(relid),
1200 : ObjectIdGetDatum(pub->oid));
1201 :
1202 3756 : if (HeapTupleIsValid(pubtuple))
1203 : {
1204 : /* Lookup the column list attribute. */
1205 2104 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1206 : Anum_pg_publication_rel_prattrs,
1207 : &(nulls[2]));
1208 :
1209 : /* Null indicates no filter. */
1210 2104 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1211 : Anum_pg_publication_rel_prqual,
1212 : &(nulls[3]));
1213 : }
1214 : else
1215 : {
1216 1652 : nulls[2] = true;
1217 1652 : nulls[3] = true;
1218 : }
1219 :
1220 : /* Show all columns when the column list is not specified. */
1221 3756 : if (nulls[2])
1222 : {
1223 3522 : Relation rel = table_open(relid, AccessShareLock);
1224 3522 : int nattnums = 0;
1225 : int16 *attnums;
1226 3522 : TupleDesc desc = RelationGetDescr(rel);
1227 : int i;
1228 :
1229 3522 : attnums = (int16 *) palloc(desc->natts * sizeof(int16));
1230 :
1231 9424 : for (i = 0; i < desc->natts; i++)
1232 : {
1233 5902 : Form_pg_attribute att = TupleDescAttr(desc, i);
1234 :
1235 5902 : if (att->attisdropped || att->attgenerated)
1236 34 : continue;
1237 :
1238 5868 : attnums[nattnums++] = att->attnum;
1239 : }
1240 :
1241 3522 : if (nattnums > 0)
1242 : {
1243 3478 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1244 3478 : nulls[2] = false;
1245 : }
1246 :
1247 3522 : table_close(rel, AccessShareLock);
1248 : }
1249 :
1250 3756 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1251 :
1252 3756 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1253 : }
1254 :
1255 1720 : SRF_RETURN_DONE(funcctx);
1256 : }
|