Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/namespace.h"
23 : #include "catalog/objectaccess.h"
24 : #include "catalog/objectaddress.h"
25 : #include "catalog/pg_database.h"
26 : #include "catalog/pg_inherits.h"
27 : #include "catalog/pg_namespace.h"
28 : #include "catalog/pg_proc.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "commands/defrem.h"
33 : #include "commands/event_trigger.h"
34 : #include "commands/publicationcmds.h"
35 : #include "miscadmin.h"
36 : #include "nodes/nodeFuncs.h"
37 : #include "parser/parse_clause.h"
38 : #include "parser/parse_collate.h"
39 : #include "parser/parse_relation.h"
40 : #include "rewrite/rewriteHandler.h"
41 : #include "storage/lmgr.h"
42 : #include "utils/acl.h"
43 : #include "utils/builtins.h"
44 : #include "utils/inval.h"
45 : #include "utils/lsyscache.h"
46 : #include "utils/rel.h"
47 : #include "utils/syscache.h"
48 : #include "utils/varlena.h"
49 :
50 :
51 : /*
52 : * Information used to validate the columns in the row filter expression. See
53 : * contain_invalid_rfcolumn_walker for details.
54 : */
55 : typedef struct rf_context
56 : {
57 : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : bool pubviaroot; /* true if we are validating the parent
59 : * relation's row filter */
60 : Oid relid; /* relid of the relation */
61 : Oid parentid; /* relid of the parent relation */
62 : } rf_context;
63 :
64 : static List *OpenTableList(List *tables);
65 : static void CloseTableList(List *rels);
66 : static void LockSchemaList(List *schemalist);
67 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : AlterPublicationStmt *stmt);
69 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : AlterPublicationStmt *stmt);
72 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 : static char defGetGeneratedColsOption(DefElem *def);
74 :
75 :
76 : static void
77 1012 : parse_publication_options(ParseState *pstate,
78 : List *options,
79 : bool *publish_given,
80 : PublicationActions *pubactions,
81 : bool *publish_via_partition_root_given,
82 : bool *publish_via_partition_root,
83 : bool *publish_generated_columns_given,
84 : char *publish_generated_columns)
85 : {
86 : ListCell *lc;
87 :
88 1012 : *publish_given = false;
89 1012 : *publish_via_partition_root_given = false;
90 1012 : *publish_generated_columns_given = false;
91 :
92 : /* defaults */
93 1012 : pubactions->pubinsert = true;
94 1012 : pubactions->pubupdate = true;
95 1012 : pubactions->pubdelete = true;
96 1012 : pubactions->pubtruncate = true;
97 1012 : *publish_via_partition_root = false;
98 1012 : *publish_generated_columns = PUBLISH_GENCOLS_NONE;
99 :
100 : /* Parse options */
101 1376 : foreach(lc, options)
102 : {
103 400 : DefElem *defel = (DefElem *) lfirst(lc);
104 :
105 400 : if (strcmp(defel->defname, "publish") == 0)
106 : {
107 : char *publish;
108 : List *publish_list;
109 : ListCell *lc2;
110 :
111 140 : if (*publish_given)
112 0 : errorConflictingDefElem(defel, pstate);
113 :
114 : /*
115 : * If publish option was given only the explicitly listed actions
116 : * should be published.
117 : */
118 140 : pubactions->pubinsert = false;
119 140 : pubactions->pubupdate = false;
120 140 : pubactions->pubdelete = false;
121 140 : pubactions->pubtruncate = false;
122 :
123 140 : *publish_given = true;
124 :
125 : /*
126 : * SplitIdentifierString destructively modifies its input, so make
127 : * a copy so we don't modify the memory of the executing statement
128 : */
129 140 : publish = pstrdup(defGetString(defel));
130 :
131 140 : if (!SplitIdentifierString(publish, ',', &publish_list))
132 0 : ereport(ERROR,
133 : (errcode(ERRCODE_SYNTAX_ERROR),
134 : errmsg("invalid list syntax in parameter \"%s\"",
135 : "publish")));
136 :
137 : /* Process the option list. */
138 334 : foreach(lc2, publish_list)
139 : {
140 200 : char *publish_opt = (char *) lfirst(lc2);
141 :
142 200 : if (strcmp(publish_opt, "insert") == 0)
143 124 : pubactions->pubinsert = true;
144 76 : else if (strcmp(publish_opt, "update") == 0)
145 30 : pubactions->pubupdate = true;
146 46 : else if (strcmp(publish_opt, "delete") == 0)
147 20 : pubactions->pubdelete = true;
148 26 : else if (strcmp(publish_opt, "truncate") == 0)
149 20 : pubactions->pubtruncate = true;
150 : else
151 6 : ereport(ERROR,
152 : (errcode(ERRCODE_SYNTAX_ERROR),
153 : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
154 : "publish", publish_opt)));
155 : }
156 : }
157 260 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
158 : {
159 172 : if (*publish_via_partition_root_given)
160 6 : errorConflictingDefElem(defel, pstate);
161 166 : *publish_via_partition_root_given = true;
162 166 : *publish_via_partition_root = defGetBoolean(defel);
163 : }
164 88 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
165 : {
166 82 : if (*publish_generated_columns_given)
167 6 : errorConflictingDefElem(defel, pstate);
168 76 : *publish_generated_columns_given = true;
169 76 : *publish_generated_columns = defGetGeneratedColsOption(defel);
170 : }
171 : else
172 6 : ereport(ERROR,
173 : (errcode(ERRCODE_SYNTAX_ERROR),
174 : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
175 : }
176 976 : }
177 :
178 : /*
179 : * Convert the PublicationObjSpecType list into schema oid list and
180 : * PublicationTable list.
181 : */
182 : static void
183 1700 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
184 : List **rels, List **schemas)
185 : {
186 : ListCell *cell;
187 : PublicationObjSpec *pubobj;
188 :
189 1700 : if (!pubobjspec_list)
190 104 : return;
191 :
192 3388 : foreach(cell, pubobjspec_list)
193 : {
194 : Oid schemaid;
195 : List *search_path;
196 :
197 1828 : pubobj = (PublicationObjSpec *) lfirst(cell);
198 :
199 1828 : switch (pubobj->pubobjtype)
200 : {
201 1432 : case PUBLICATIONOBJ_TABLE:
202 1432 : *rels = lappend(*rels, pubobj->pubtable);
203 1432 : break;
204 372 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
205 372 : schemaid = get_namespace_oid(pubobj->name, false);
206 :
207 : /* Filter out duplicates if user specifies "sch1, sch1" */
208 342 : *schemas = list_append_unique_oid(*schemas, schemaid);
209 342 : break;
210 24 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
211 24 : search_path = fetch_search_path(false);
212 24 : if (search_path == NIL) /* nothing valid in search_path? */
213 6 : ereport(ERROR,
214 : errcode(ERRCODE_UNDEFINED_SCHEMA),
215 : errmsg("no schema has been selected for CURRENT_SCHEMA"));
216 :
217 18 : schemaid = linitial_oid(search_path);
218 18 : list_free(search_path);
219 :
220 : /* Filter out duplicates if user specifies "sch1, sch1" */
221 18 : *schemas = list_append_unique_oid(*schemas, schemaid);
222 18 : break;
223 0 : default:
224 : /* shouldn't happen */
225 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
226 : break;
227 : }
228 : }
229 : }
230 :
231 : /*
232 : * Returns true if any of the columns used in the row filter WHERE expression is
233 : * not part of REPLICA IDENTITY, false otherwise.
234 : */
235 : static bool
236 258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
237 : {
238 258 : if (node == NULL)
239 0 : return false;
240 :
241 258 : if (IsA(node, Var))
242 : {
243 104 : Var *var = (Var *) node;
244 104 : AttrNumber attnum = var->varattno;
245 :
246 : /*
247 : * If pubviaroot is true, we are validating the row filter of the
248 : * parent table, but the bitmap contains the replica identity
249 : * information of the child table. So, get the column number of the
250 : * child table as parent and child column order could be different.
251 : */
252 104 : if (context->pubviaroot)
253 : {
254 16 : char *colname = get_attname(context->parentid, attnum, false);
255 :
256 16 : attnum = get_attnum(context->relid, colname);
257 : }
258 :
259 104 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
260 104 : context->bms_replident))
261 60 : return true;
262 : }
263 :
264 198 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
265 : context);
266 : }
267 :
268 : /*
269 : * Check if all columns referenced in the filter expression are part of the
270 : * REPLICA IDENTITY index or not.
271 : *
272 : * Returns true if any invalid column is found.
273 : */
274 : bool
275 726 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
276 : bool pubviaroot)
277 : {
278 : HeapTuple rftuple;
279 726 : Oid relid = RelationGetRelid(relation);
280 726 : Oid publish_as_relid = RelationGetRelid(relation);
281 726 : bool result = false;
282 : Datum rfdatum;
283 : bool rfisnull;
284 :
285 : /*
286 : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
287 : * allowed in the row filter and we can skip the validation.
288 : */
289 726 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
290 198 : return false;
291 :
292 : /*
293 : * For a partition, if pubviaroot is true, find the topmost ancestor that
294 : * is published via this publication as we need to use its row filter
295 : * expression to filter the partition's changes.
296 : *
297 : * Note that even though the row filter used is for an ancestor, the
298 : * REPLICA IDENTITY used will be for the actual child table.
299 : */
300 528 : if (pubviaroot && relation->rd_rel->relispartition)
301 : {
302 : publish_as_relid
303 108 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
304 :
305 108 : if (!OidIsValid(publish_as_relid))
306 6 : publish_as_relid = relid;
307 : }
308 :
309 528 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
310 : ObjectIdGetDatum(publish_as_relid),
311 : ObjectIdGetDatum(pubid));
312 :
313 528 : if (!HeapTupleIsValid(rftuple))
314 56 : return false;
315 :
316 472 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
317 : Anum_pg_publication_rel_prqual,
318 : &rfisnull);
319 :
320 472 : if (!rfisnull)
321 : {
322 102 : rf_context context = {0};
323 : Node *rfnode;
324 102 : Bitmapset *bms = NULL;
325 :
326 102 : context.pubviaroot = pubviaroot;
327 102 : context.parentid = publish_as_relid;
328 102 : context.relid = relid;
329 :
330 : /* Remember columns that are part of the REPLICA IDENTITY */
331 102 : bms = RelationGetIndexAttrBitmap(relation,
332 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
333 :
334 102 : context.bms_replident = bms;
335 102 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
336 102 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
337 : }
338 :
339 472 : ReleaseSysCache(rftuple);
340 :
341 472 : return result;
342 : }
343 :
344 : /*
345 : * Check for invalid columns in the publication table definition.
346 : *
347 : * This function evaluates two conditions:
348 : *
349 : * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
350 : * by the column list. If any column is missing, *invalid_column_list is set
351 : * to true.
352 : * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
353 : * are published, either by being explicitly named in the column list or, if
354 : * no column list is specified, by setting the option
355 : * publish_generated_columns to stored. If any unpublished
356 : * generated column is found, *invalid_gen_col is set to true.
357 : *
358 : * Returns true if any of the above conditions are not met.
359 : */
360 : bool
361 942 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
362 : bool pubviaroot, char pubgencols_type,
363 : bool *invalid_column_list,
364 : bool *invalid_gen_col)
365 : {
366 942 : Oid relid = RelationGetRelid(relation);
367 942 : Oid publish_as_relid = RelationGetRelid(relation);
368 : Bitmapset *idattrs;
369 942 : Bitmapset *columns = NULL;
370 942 : TupleDesc desc = RelationGetDescr(relation);
371 : Publication *pub;
372 : int x;
373 :
374 942 : *invalid_column_list = false;
375 942 : *invalid_gen_col = false;
376 :
377 : /*
378 : * For a partition, if pubviaroot is true, find the topmost ancestor that
379 : * is published via this publication as we need to use its column list for
380 : * the changes.
381 : *
382 : * Note that even though the column list used is for an ancestor, the
383 : * REPLICA IDENTITY used will be for the actual child table.
384 : */
385 942 : if (pubviaroot && relation->rd_rel->relispartition)
386 : {
387 160 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
388 :
389 160 : if (!OidIsValid(publish_as_relid))
390 36 : publish_as_relid = relid;
391 : }
392 :
393 : /* Fetch the column list */
394 942 : pub = GetPublication(pubid);
395 942 : check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
396 :
397 942 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
398 : {
399 : /* With REPLICA IDENTITY FULL, no column list is allowed. */
400 212 : *invalid_column_list = (columns != NULL);
401 :
402 : /*
403 : * As we don't allow a column list with REPLICA IDENTITY FULL, the
404 : * publish_generated_columns option must be set to stored if the table
405 : * has any stored generated columns.
406 : */
407 212 : if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
408 200 : relation->rd_att->constr &&
409 88 : relation->rd_att->constr->has_generated_stored)
410 6 : *invalid_gen_col = true;
411 :
412 : /*
413 : * Virtual generated columns are currently not supported for logical
414 : * replication at all.
415 : */
416 212 : if (relation->rd_att->constr &&
417 100 : relation->rd_att->constr->has_generated_virtual)
418 12 : *invalid_gen_col = true;
419 :
420 212 : if (*invalid_gen_col && *invalid_column_list)
421 0 : return true;
422 : }
423 :
424 : /* Remember columns that are part of the REPLICA IDENTITY */
425 942 : idattrs = RelationGetIndexAttrBitmap(relation,
426 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
427 :
428 : /*
429 : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
430 : * (to handle system columns the usual way), while column list does not
431 : * use offset, so we can't do bms_is_subset(). Instead, we have to loop
432 : * over the idattrs and check all of them are in the list.
433 : */
434 942 : x = -1;
435 1598 : while ((x = bms_next_member(idattrs, x)) >= 0)
436 : {
437 662 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
438 662 : Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
439 :
440 662 : if (columns == NULL)
441 : {
442 : /*
443 : * The publish_generated_columns option must be set to stored if
444 : * the REPLICA IDENTITY contains any stored generated column.
445 : */
446 452 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
447 : {
448 6 : *invalid_gen_col = true;
449 6 : break;
450 : }
451 :
452 : /*
453 : * The equivalent setting for virtual generated columns does not
454 : * exist yet.
455 : */
456 446 : if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
457 : {
458 0 : *invalid_gen_col = true;
459 0 : break;
460 : }
461 :
462 : /* Skip validating the column list since it is not defined */
463 446 : continue;
464 : }
465 :
466 : /*
467 : * If pubviaroot is true, we are validating the column list of the
468 : * parent table, but the bitmap contains the replica identity
469 : * information of the child table. The parent/child attnums may not
470 : * match, so translate them to the parent - get the attname from the
471 : * child, and look it up in the parent.
472 : */
473 210 : if (pubviaroot)
474 : {
475 : /* attribute name in the child table */
476 80 : char *colname = get_attname(relid, attnum, false);
477 :
478 : /*
479 : * Determine the attnum for the attribute name in parent (we are
480 : * using the column list defined on the parent).
481 : */
482 80 : attnum = get_attnum(publish_as_relid, colname);
483 : }
484 :
485 : /* replica identity column, not covered by the column list */
486 210 : *invalid_column_list |= !bms_is_member(attnum, columns);
487 :
488 210 : if (*invalid_column_list && *invalid_gen_col)
489 0 : break;
490 : }
491 :
492 942 : bms_free(columns);
493 942 : bms_free(idattrs);
494 :
495 942 : return *invalid_column_list || *invalid_gen_col;
496 : }
497 :
498 : /*
499 : * Invalidate entries in the RelationSyncCache for relations included in the
500 : * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
501 : *
502 : * If 'puballtables' is true, invalidate all cache entries.
503 : */
504 : void
505 36 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
506 : {
507 36 : if (puballtables)
508 : {
509 6 : CacheInvalidateRelSyncAll();
510 : }
511 : else
512 : {
513 30 : List *relids = NIL;
514 30 : List *schemarelids = NIL;
515 :
516 : /*
517 : * For partitioned tables, we must invalidate all partitions and
518 : * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
519 : * a target. However, WAL records for TRUNCATE specify both a root and
520 : * its leaves.
521 : */
522 30 : relids = GetPublicationRelations(pubid,
523 : PUBLICATION_PART_ALL);
524 30 : schemarelids = GetAllSchemaPublicationRelations(pubid,
525 : PUBLICATION_PART_ALL);
526 :
527 30 : relids = list_concat_unique_oid(relids, schemarelids);
528 :
529 : /* Invalidate the relsyncache */
530 66 : foreach_oid(relid, relids)
531 6 : CacheInvalidateRelSync(relid);
532 : }
533 :
534 36 : return;
535 : }
536 :
537 : /* check_functions_in_node callback */
538 : static bool
539 436 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
540 : {
541 436 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
542 : func_id >= FirstNormalObjectId);
543 : }
544 :
545 : /*
546 : * The row filter walker checks if the row filter expression is a "simple
547 : * expression".
548 : *
549 : * It allows only simple or compound expressions such as:
550 : * - (Var Op Const)
551 : * - (Var Op Var)
552 : * - (Var Op Const) AND/OR (Var Op Const)
553 : * - etc
554 : * (where Var is a column of the table this filter belongs to)
555 : *
556 : * The simple expression has the following restrictions:
557 : * - User-defined operators are not allowed;
558 : * - User-defined functions are not allowed;
559 : * - User-defined types are not allowed;
560 : * - User-defined collations are not allowed;
561 : * - Non-immutable built-in functions are not allowed;
562 : * - System columns are not allowed.
563 : *
564 : * NOTES
565 : *
566 : * We don't allow user-defined functions/operators/types/collations because
567 : * (a) if a user drops a user-defined object used in a row filter expression or
568 : * if there is any other error while using it, the logical decoding
569 : * infrastructure won't be able to recover from such an error even if the
570 : * object is recreated again because a historic snapshot is used to evaluate
571 : * the row filter;
572 : * (b) a user-defined function can be used to access tables that could have
573 : * unpleasant results because a historic snapshot is used. That's why only
574 : * immutable built-in functions are allowed in row filter expressions.
575 : *
576 : * We don't allow system columns because currently, we don't have that
577 : * information in the tuple passed to downstream. Also, as we don't replicate
578 : * those to subscribers, there doesn't seem to be a need for a filter on those
579 : * columns.
580 : *
581 : * We can allow other node types after more analysis and testing.
582 : */
583 : static bool
584 1440 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
585 : {
586 1440 : char *errdetail_msg = NULL;
587 :
588 1440 : if (node == NULL)
589 6 : return false;
590 :
591 1434 : switch (nodeTag(node))
592 : {
593 388 : case T_Var:
594 : /* System columns are not allowed. */
595 388 : if (((Var *) node)->varattno < InvalidAttrNumber)
596 6 : errdetail_msg = _("System columns are not allowed.");
597 388 : break;
598 392 : case T_OpExpr:
599 : case T_DistinctExpr:
600 : case T_NullIfExpr:
601 : /* OK, except user-defined operators are not allowed. */
602 392 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
603 6 : errdetail_msg = _("User-defined operators are not allowed.");
604 392 : break;
605 6 : case T_ScalarArrayOpExpr:
606 : /* OK, except user-defined operators are not allowed. */
607 6 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
608 0 : errdetail_msg = _("User-defined operators are not allowed.");
609 :
610 : /*
611 : * We don't need to check the hashfuncid and negfuncid of
612 : * ScalarArrayOpExpr as those functions are only built for a
613 : * subquery.
614 : */
615 6 : break;
616 6 : case T_RowCompareExpr:
617 : {
618 : ListCell *opid;
619 :
620 : /* OK, except user-defined operators are not allowed. */
621 18 : foreach(opid, ((RowCompareExpr *) node)->opnos)
622 : {
623 12 : if (lfirst_oid(opid) >= FirstNormalObjectId)
624 : {
625 0 : errdetail_msg = _("User-defined operators are not allowed.");
626 0 : break;
627 : }
628 : }
629 : }
630 6 : break;
631 636 : case T_Const:
632 : case T_FuncExpr:
633 : case T_BoolExpr:
634 : case T_RelabelType:
635 : case T_CollateExpr:
636 : case T_CaseExpr:
637 : case T_CaseTestExpr:
638 : case T_ArrayExpr:
639 : case T_RowExpr:
640 : case T_CoalesceExpr:
641 : case T_MinMaxExpr:
642 : case T_XmlExpr:
643 : case T_NullTest:
644 : case T_BooleanTest:
645 : case T_List:
646 : /* OK, supported */
647 636 : break;
648 6 : default:
649 6 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
650 6 : break;
651 : }
652 :
653 : /*
654 : * For all the supported nodes, if we haven't already found a problem,
655 : * check the types, functions, and collations used in it. We check List
656 : * by walking through each element.
657 : */
658 1434 : if (!errdetail_msg && !IsA(node, List))
659 : {
660 1362 : if (exprType(node) >= FirstNormalObjectId)
661 6 : errdetail_msg = _("User-defined types are not allowed.");
662 1356 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
663 : pstate))
664 12 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
665 2688 : else if (exprCollation(node) >= FirstNormalObjectId ||
666 1344 : exprInputCollation(node) >= FirstNormalObjectId)
667 6 : errdetail_msg = _("User-defined collations are not allowed.");
668 : }
669 :
670 : /*
671 : * If we found a problem in this node, throw error now. Otherwise keep
672 : * going.
673 : */
674 1434 : if (errdetail_msg)
675 42 : ereport(ERROR,
676 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
677 : errmsg("invalid publication WHERE expression"),
678 : errdetail_internal("%s", errdetail_msg),
679 : parser_errposition(pstate, exprLocation(node))));
680 :
681 1392 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
682 : pstate);
683 : }
684 :
685 : /*
686 : * Check if the row filter expression is a "simple expression".
687 : *
688 : * See check_simple_rowfilter_expr_walker for details.
689 : */
690 : static bool
691 376 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
692 : {
693 376 : return check_simple_rowfilter_expr_walker(node, pstate);
694 : }
695 :
696 : /*
697 : * Transform the publication WHERE expression for all the relations in the list,
698 : * ensuring it is coerced to boolean and necessary collation information is
699 : * added if required, and add a new nsitem/RTE for the associated relation to
700 : * the ParseState's namespace list.
701 : *
702 : * Also check the publication row filter expression and throw an error if
703 : * anything not permitted or unexpected is encountered.
704 : */
705 : static void
706 1186 : TransformPubWhereClauses(List *tables, const char *queryString,
707 : bool pubviaroot)
708 : {
709 : ListCell *lc;
710 :
711 2388 : foreach(lc, tables)
712 : {
713 : ParseNamespaceItem *nsitem;
714 1262 : Node *whereclause = NULL;
715 : ParseState *pstate;
716 1262 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
717 :
718 1262 : if (pri->whereClause == NULL)
719 868 : continue;
720 :
721 : /*
722 : * If the publication doesn't publish changes via the root partitioned
723 : * table, the partition's row filter will be used. So disallow using
724 : * WHERE clause on partitioned table in this case.
725 : */
726 394 : if (!pubviaroot &&
727 364 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
728 6 : ereport(ERROR,
729 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
730 : errmsg("cannot use publication WHERE clause for relation \"%s\"",
731 : RelationGetRelationName(pri->relation)),
732 : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
733 : "publish_via_partition_root")));
734 :
735 : /*
736 : * A fresh pstate is required so that we only have "this" table in its
737 : * rangetable
738 : */
739 388 : pstate = make_parsestate(NULL);
740 388 : pstate->p_sourcetext = queryString;
741 388 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
742 : AccessShareLock, NULL,
743 : false, false);
744 388 : addNSItemToQuery(pstate, nsitem, false, true, true);
745 :
746 388 : whereclause = transformWhereClause(pstate,
747 388 : copyObject(pri->whereClause),
748 : EXPR_KIND_WHERE,
749 : "PUBLICATION WHERE");
750 :
751 : /* Fix up collation information */
752 376 : assign_expr_collations(pstate, whereclause);
753 :
754 376 : whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
755 :
756 : /*
757 : * We allow only simple expressions in row filters. See
758 : * check_simple_rowfilter_expr_walker.
759 : */
760 376 : check_simple_rowfilter_expr(whereclause, pstate);
761 :
762 334 : free_parsestate(pstate);
763 :
764 334 : pri->whereClause = whereclause;
765 : }
766 1126 : }
767 :
768 :
769 : /*
770 : * Given a list of tables that are going to be added to a publication,
771 : * verify that they fulfill the necessary preconditions, namely: no tables
772 : * have a column list if any schema is published; and partitioned tables do
773 : * not have column lists if publish_via_partition_root is not set.
774 : *
775 : * 'publish_schema' indicates that the publication contains any TABLES IN
776 : * SCHEMA elements (newly added in this command, or preexisting).
777 : * 'pubviaroot' is the value of publish_via_partition_root.
778 : */
779 : static void
780 1126 : CheckPubRelationColumnList(char *pubname, List *tables,
781 : bool publish_schema, bool pubviaroot)
782 : {
783 : ListCell *lc;
784 :
785 2298 : foreach(lc, tables)
786 : {
787 1202 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
788 :
789 1202 : if (pri->columns == NIL)
790 798 : continue;
791 :
792 : /*
793 : * Disallow specifying column list if any schema is in the
794 : * publication.
795 : *
796 : * XXX We could instead just forbid the case when the publication
797 : * tries to publish the table with a column list and a schema for that
798 : * table. However, if we do that then we need a restriction during
799 : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
800 : * seem to be a good idea.
801 : */
802 404 : if (publish_schema)
803 24 : ereport(ERROR,
804 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
805 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
806 : get_namespace_name(RelationGetNamespace(pri->relation)),
807 : RelationGetRelationName(pri->relation), pubname),
808 : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
809 :
810 : /*
811 : * If the publication doesn't publish changes via the root partitioned
812 : * table, the partition's column list will be used. So disallow using
813 : * a column list on the partitioned table in this case.
814 : */
815 380 : if (!pubviaroot &&
816 304 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
817 6 : ereport(ERROR,
818 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
819 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
820 : get_namespace_name(RelationGetNamespace(pri->relation)),
821 : RelationGetRelationName(pri->relation), pubname),
822 : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
823 : "publish_via_partition_root")));
824 : }
825 1096 : }
826 :
827 : /*
828 : * Create new publication.
829 : */
830 : ObjectAddress
831 896 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
832 : {
833 : Relation rel;
834 : ObjectAddress myself;
835 : Oid puboid;
836 : bool nulls[Natts_pg_publication];
837 : Datum values[Natts_pg_publication];
838 : HeapTuple tup;
839 : bool publish_given;
840 : PublicationActions pubactions;
841 : bool publish_via_partition_root_given;
842 : bool publish_via_partition_root;
843 : bool publish_generated_columns_given;
844 : char publish_generated_columns;
845 : AclResult aclresult;
846 896 : List *relations = NIL;
847 896 : List *schemaidlist = NIL;
848 :
849 : /* must have CREATE privilege on database */
850 896 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
851 896 : if (aclresult != ACLCHECK_OK)
852 6 : aclcheck_error(aclresult, OBJECT_DATABASE,
853 6 : get_database_name(MyDatabaseId));
854 :
855 : /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
856 890 : if (!superuser())
857 : {
858 24 : if (stmt->for_all_tables || stmt->for_all_sequences)
859 0 : ereport(ERROR,
860 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
861 : errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
862 : }
863 :
864 890 : rel = table_open(PublicationRelationId, RowExclusiveLock);
865 :
866 : /* Check if name is used */
867 890 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
868 : CStringGetDatum(stmt->pubname));
869 890 : if (OidIsValid(puboid))
870 6 : ereport(ERROR,
871 : (errcode(ERRCODE_DUPLICATE_OBJECT),
872 : errmsg("publication \"%s\" already exists",
873 : stmt->pubname)));
874 :
875 : /* Form a tuple. */
876 884 : memset(values, 0, sizeof(values));
877 884 : memset(nulls, false, sizeof(nulls));
878 :
879 884 : values[Anum_pg_publication_pubname - 1] =
880 884 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
881 884 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
882 :
883 884 : parse_publication_options(pstate,
884 : stmt->options,
885 : &publish_given, &pubactions,
886 : &publish_via_partition_root_given,
887 : &publish_via_partition_root,
888 : &publish_generated_columns_given,
889 : &publish_generated_columns);
890 :
891 848 : if (stmt->for_all_sequences &&
892 28 : (publish_given || publish_via_partition_root_given ||
893 : publish_generated_columns_given))
894 2 : ereport(NOTICE,
895 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
896 : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
897 :
898 848 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
899 : Anum_pg_publication_oid);
900 848 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
901 848 : values[Anum_pg_publication_puballtables - 1] =
902 848 : BoolGetDatum(stmt->for_all_tables);
903 848 : values[Anum_pg_publication_puballsequences - 1] =
904 848 : BoolGetDatum(stmt->for_all_sequences);
905 848 : values[Anum_pg_publication_pubinsert - 1] =
906 848 : BoolGetDatum(pubactions.pubinsert);
907 848 : values[Anum_pg_publication_pubupdate - 1] =
908 848 : BoolGetDatum(pubactions.pubupdate);
909 848 : values[Anum_pg_publication_pubdelete - 1] =
910 848 : BoolGetDatum(pubactions.pubdelete);
911 848 : values[Anum_pg_publication_pubtruncate - 1] =
912 848 : BoolGetDatum(pubactions.pubtruncate);
913 848 : values[Anum_pg_publication_pubviaroot - 1] =
914 848 : BoolGetDatum(publish_via_partition_root);
915 848 : values[Anum_pg_publication_pubgencols - 1] =
916 848 : CharGetDatum(publish_generated_columns);
917 :
918 848 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
919 :
920 : /* Insert tuple into catalog. */
921 848 : CatalogTupleInsert(rel, tup);
922 848 : heap_freetuple(tup);
923 :
924 848 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
925 :
926 848 : ObjectAddressSet(myself, PublicationRelationId, puboid);
927 :
928 : /* Make the changes visible. */
929 848 : CommandCounterIncrement();
930 :
931 : /* Associate objects with the publication. */
932 848 : if (stmt->for_all_tables)
933 : {
934 : /*
935 : * Invalidate relcache so that publication info is rebuilt. Sequences
936 : * publication doesn't require invalidation, as replica identity
937 : * checks don't apply to them.
938 : */
939 106 : CacheInvalidateRelcacheAll();
940 : }
941 742 : else if (!stmt->for_all_sequences)
942 : {
943 722 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
944 : &schemaidlist);
945 :
946 : /* FOR TABLES IN SCHEMA requires superuser */
947 704 : if (schemaidlist != NIL && !superuser())
948 6 : ereport(ERROR,
949 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
950 : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
951 :
952 698 : if (relations != NIL)
953 : {
954 : List *rels;
955 :
956 474 : rels = OpenTableList(relations);
957 444 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
958 : publish_via_partition_root);
959 :
960 420 : CheckPubRelationColumnList(stmt->pubname, rels,
961 : schemaidlist != NIL,
962 : publish_via_partition_root);
963 :
964 414 : PublicationAddTables(puboid, rels, true, NULL);
965 388 : CloseTableList(rels);
966 : }
967 :
968 612 : if (schemaidlist != NIL)
969 : {
970 : /*
971 : * Schema lock is held until the publication is created to prevent
972 : * concurrent schema deletion.
973 : */
974 152 : LockSchemaList(schemaidlist);
975 152 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
976 : }
977 : }
978 :
979 732 : table_close(rel, RowExclusiveLock);
980 :
981 732 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
982 :
983 : /*
984 : * We don't need this warning message when wal_level >= 'replica' since
985 : * logical decoding is automatically enabled up on a logical slot
986 : * creation.
987 : */
988 732 : if (wal_level < WAL_LEVEL_REPLICA)
989 26 : ereport(WARNING,
990 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
991 : errmsg("logical decoding must be enabled to publish logical changes"),
992 : errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
993 :
994 732 : return myself;
995 : }
996 :
997 : /*
998 : * Change options of a publication.
999 : */
1000 : static void
1001 128 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
1002 : Relation rel, HeapTuple tup)
1003 : {
1004 : bool nulls[Natts_pg_publication];
1005 : bool replaces[Natts_pg_publication];
1006 : Datum values[Natts_pg_publication];
1007 : bool publish_given;
1008 : PublicationActions pubactions;
1009 : bool publish_via_partition_root_given;
1010 : bool publish_via_partition_root;
1011 : bool publish_generated_columns_given;
1012 : char publish_generated_columns;
1013 : ObjectAddress obj;
1014 : Form_pg_publication pubform;
1015 128 : List *root_relids = NIL;
1016 : ListCell *lc;
1017 :
1018 128 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1019 :
1020 128 : parse_publication_options(pstate,
1021 : stmt->options,
1022 : &publish_given, &pubactions,
1023 : &publish_via_partition_root_given,
1024 : &publish_via_partition_root,
1025 : &publish_generated_columns_given,
1026 : &publish_generated_columns);
1027 :
1028 128 : if (pubform->puballsequences &&
1029 12 : (publish_given || publish_via_partition_root_given ||
1030 : publish_generated_columns_given))
1031 12 : ereport(NOTICE,
1032 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1033 : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
1034 :
1035 : /*
1036 : * If the publication doesn't publish changes via the root partitioned
1037 : * table, the partition's row filter and column list will be used. So
1038 : * disallow using WHERE clause and column lists on partitioned table in
1039 : * this case.
1040 : */
1041 128 : if (!pubform->puballtables && publish_via_partition_root_given &&
1042 80 : !publish_via_partition_root)
1043 : {
1044 : /*
1045 : * Lock the publication so nobody else can do anything with it. This
1046 : * prevents concurrent alter to add partitioned table(s) with WHERE
1047 : * clause(s) and/or column lists which we don't allow when not
1048 : * publishing via root.
1049 : */
1050 48 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1051 : AccessShareLock);
1052 :
1053 48 : root_relids = GetPublicationRelations(pubform->oid,
1054 : PUBLICATION_PART_ROOT);
1055 :
1056 84 : foreach(lc, root_relids)
1057 : {
1058 48 : Oid relid = lfirst_oid(lc);
1059 : HeapTuple rftuple;
1060 : char relkind;
1061 : char *relname;
1062 : bool has_rowfilter;
1063 : bool has_collist;
1064 :
1065 : /*
1066 : * Beware: we don't have lock on the relations, so cope silently
1067 : * with the cache lookups returning NULL.
1068 : */
1069 :
1070 48 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1071 : ObjectIdGetDatum(relid),
1072 : ObjectIdGetDatum(pubform->oid));
1073 48 : if (!HeapTupleIsValid(rftuple))
1074 0 : continue;
1075 48 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1076 48 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1077 48 : if (!has_rowfilter && !has_collist)
1078 : {
1079 12 : ReleaseSysCache(rftuple);
1080 12 : continue;
1081 : }
1082 :
1083 36 : relkind = get_rel_relkind(relid);
1084 36 : if (relkind != RELKIND_PARTITIONED_TABLE)
1085 : {
1086 24 : ReleaseSysCache(rftuple);
1087 24 : continue;
1088 : }
1089 12 : relname = get_rel_name(relid);
1090 12 : if (relname == NULL) /* table concurrently dropped */
1091 : {
1092 0 : ReleaseSysCache(rftuple);
1093 0 : continue;
1094 : }
1095 :
1096 12 : if (has_rowfilter)
1097 6 : ereport(ERROR,
1098 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1099 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1100 : "publish_via_partition_root",
1101 : stmt->pubname),
1102 : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1103 : relname, "publish_via_partition_root")));
1104 : Assert(has_collist);
1105 6 : ereport(ERROR,
1106 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1107 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1108 : "publish_via_partition_root",
1109 : stmt->pubname),
1110 : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1111 : relname, "publish_via_partition_root")));
1112 : }
1113 : }
1114 :
1115 : /* Everything ok, form a new tuple. */
1116 116 : memset(values, 0, sizeof(values));
1117 116 : memset(nulls, false, sizeof(nulls));
1118 116 : memset(replaces, false, sizeof(replaces));
1119 :
1120 116 : if (publish_given)
1121 : {
1122 34 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1123 34 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1124 :
1125 34 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1126 34 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1127 :
1128 34 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1129 34 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1130 :
1131 34 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1132 34 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1133 : }
1134 :
1135 116 : if (publish_via_partition_root_given)
1136 : {
1137 70 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1138 70 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1139 : }
1140 :
1141 116 : if (publish_generated_columns_given)
1142 : {
1143 12 : values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1144 12 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1145 : }
1146 :
1147 116 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1148 : replaces);
1149 :
1150 : /* Update the catalog. */
1151 116 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1152 :
1153 116 : CommandCounterIncrement();
1154 :
1155 116 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1156 :
1157 : /* Invalidate the relcache. */
1158 116 : if (pubform->puballtables)
1159 : {
1160 20 : CacheInvalidateRelcacheAll();
1161 : }
1162 : else
1163 : {
1164 96 : List *relids = NIL;
1165 96 : List *schemarelids = NIL;
1166 :
1167 : /*
1168 : * For any partitioned tables contained in the publication, we must
1169 : * invalidate all partitions contained in the respective partition
1170 : * trees, not just those explicitly mentioned in the publication.
1171 : */
1172 96 : if (root_relids == NIL)
1173 60 : relids = GetPublicationRelations(pubform->oid,
1174 : PUBLICATION_PART_ALL);
1175 : else
1176 : {
1177 : /*
1178 : * We already got tables explicitly mentioned in the publication.
1179 : * Now get all partitions for the partitioned table in the list.
1180 : */
1181 72 : foreach(lc, root_relids)
1182 36 : relids = GetPubPartitionOptionRelations(relids,
1183 : PUBLICATION_PART_ALL,
1184 : lfirst_oid(lc));
1185 : }
1186 :
1187 96 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1188 : PUBLICATION_PART_ALL);
1189 96 : relids = list_concat_unique_oid(relids, schemarelids);
1190 :
1191 96 : InvalidatePublicationRels(relids);
1192 : }
1193 :
1194 116 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1195 116 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1196 : (Node *) stmt);
1197 :
1198 116 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1199 116 : }
1200 :
1201 : /*
1202 : * Invalidate the relations.
1203 : */
1204 : void
1205 2450 : InvalidatePublicationRels(List *relids)
1206 : {
1207 : /*
1208 : * We don't want to send too many individual messages, at some point it's
1209 : * cheaper to just reset whole relcache.
1210 : */
1211 2450 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1212 : {
1213 : ListCell *lc;
1214 :
1215 20726 : foreach(lc, relids)
1216 18276 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1217 : }
1218 : else
1219 0 : CacheInvalidateRelcacheAll();
1220 2450 : }
1221 :
1222 : /*
1223 : * Add or remove table to/from publication.
1224 : */
1225 : static void
1226 918 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1227 : List *tables, const char *queryString,
1228 : bool publish_schema)
1229 : {
1230 918 : List *rels = NIL;
1231 918 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1232 918 : Oid pubid = pubform->oid;
1233 :
1234 : /*
1235 : * Nothing to do if no objects, except in SET: for that it is quite
1236 : * possible that user has not specified any tables in which case we need
1237 : * to remove all the existing tables.
1238 : */
1239 918 : if (!tables && stmt->action != AP_SetObjects)
1240 76 : return;
1241 :
1242 842 : rels = OpenTableList(tables);
1243 :
1244 842 : if (stmt->action == AP_AddObjects)
1245 : {
1246 296 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1247 :
1248 278 : publish_schema |= is_schema_publication(pubid);
1249 :
1250 278 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1251 278 : pubform->pubviaroot);
1252 :
1253 266 : PublicationAddTables(pubid, rels, false, stmt);
1254 : }
1255 546 : else if (stmt->action == AP_DropObjects)
1256 100 : PublicationDropTables(pubid, rels, false);
1257 : else /* AP_SetObjects */
1258 : {
1259 446 : List *oldrelids = GetPublicationRelations(pubid,
1260 : PUBLICATION_PART_ROOT);
1261 446 : List *delrels = NIL;
1262 : ListCell *oldlc;
1263 :
1264 446 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1265 :
1266 428 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1267 428 : pubform->pubviaroot);
1268 :
1269 : /*
1270 : * To recreate the relation list for the publication, look for
1271 : * existing relations that do not need to be dropped.
1272 : */
1273 806 : foreach(oldlc, oldrelids)
1274 : {
1275 402 : Oid oldrelid = lfirst_oid(oldlc);
1276 : ListCell *newlc;
1277 : PublicationRelInfo *oldrel;
1278 402 : bool found = false;
1279 : HeapTuple rftuple;
1280 402 : Node *oldrelwhereclause = NULL;
1281 402 : Bitmapset *oldcolumns = NULL;
1282 :
1283 : /* look up the cache for the old relmap */
1284 402 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1285 : ObjectIdGetDatum(oldrelid),
1286 : ObjectIdGetDatum(pubid));
1287 :
1288 : /*
1289 : * See if the existing relation currently has a WHERE clause or a
1290 : * column list. We need to compare those too.
1291 : */
1292 402 : if (HeapTupleIsValid(rftuple))
1293 : {
1294 402 : bool isnull = true;
1295 : Datum whereClauseDatum;
1296 : Datum columnListDatum;
1297 :
1298 : /* Load the WHERE clause for this table. */
1299 402 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1300 : Anum_pg_publication_rel_prqual,
1301 : &isnull);
1302 402 : if (!isnull)
1303 210 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1304 :
1305 : /* Transform the int2vector column list to a bitmap. */
1306 402 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1307 : Anum_pg_publication_rel_prattrs,
1308 : &isnull);
1309 :
1310 402 : if (!isnull)
1311 136 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1312 :
1313 402 : ReleaseSysCache(rftuple);
1314 : }
1315 :
1316 778 : foreach(newlc, rels)
1317 : {
1318 : PublicationRelInfo *newpubrel;
1319 : Oid newrelid;
1320 404 : Bitmapset *newcolumns = NULL;
1321 :
1322 404 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1323 404 : newrelid = RelationGetRelid(newpubrel->relation);
1324 :
1325 : /*
1326 : * Validate the column list. If the column list or WHERE
1327 : * clause changes, then the validation done here will be
1328 : * duplicated inside PublicationAddTables(). The validation
1329 : * is cheap enough that that seems harmless.
1330 : */
1331 404 : newcolumns = pub_collist_validate(newpubrel->relation,
1332 : newpubrel->columns);
1333 :
1334 : /*
1335 : * Check if any of the new set of relations matches with the
1336 : * existing relations in the publication. Additionally, if the
1337 : * relation has an associated WHERE clause, check the WHERE
1338 : * expressions also match. Same for the column list. Drop the
1339 : * rest.
1340 : */
1341 392 : if (newrelid == oldrelid)
1342 : {
1343 284 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1344 80 : bms_equal(oldcolumns, newcolumns))
1345 : {
1346 16 : found = true;
1347 16 : break;
1348 : }
1349 : }
1350 : }
1351 :
1352 : /*
1353 : * Add the non-matched relations to a list so that they can be
1354 : * dropped.
1355 : */
1356 390 : if (!found)
1357 : {
1358 374 : oldrel = palloc_object(PublicationRelInfo);
1359 374 : oldrel->whereClause = NULL;
1360 374 : oldrel->columns = NIL;
1361 374 : oldrel->relation = table_open(oldrelid,
1362 : ShareUpdateExclusiveLock);
1363 374 : delrels = lappend(delrels, oldrel);
1364 : }
1365 : }
1366 :
1367 : /* And drop them. */
1368 404 : PublicationDropTables(pubid, delrels, true);
1369 :
1370 : /*
1371 : * Don't bother calculating the difference for adding, we'll catch and
1372 : * skip existing ones when doing catalog update.
1373 : */
1374 404 : PublicationAddTables(pubid, rels, true, stmt);
1375 :
1376 404 : CloseTableList(delrels);
1377 : }
1378 :
1379 710 : CloseTableList(rels);
1380 : }
1381 :
1382 : /*
1383 : * Alter the publication schemas.
1384 : *
1385 : * Add or remove schemas to/from publication.
1386 : */
1387 : static void
1388 786 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1389 : HeapTuple tup, List *schemaidlist)
1390 : {
1391 786 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1392 :
1393 : /*
1394 : * Nothing to do if no objects, except in SET: for that it is quite
1395 : * possible that user has not specified any schemas in which case we need
1396 : * to remove all the existing schemas.
1397 : */
1398 786 : if (!schemaidlist && stmt->action != AP_SetObjects)
1399 306 : return;
1400 :
1401 : /*
1402 : * Schema lock is held until the publication is altered to prevent
1403 : * concurrent schema deletion.
1404 : */
1405 480 : LockSchemaList(schemaidlist);
1406 480 : if (stmt->action == AP_AddObjects)
1407 : {
1408 : ListCell *lc;
1409 : List *reloids;
1410 :
1411 38 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1412 :
1413 54 : foreach(lc, reloids)
1414 : {
1415 : HeapTuple coltuple;
1416 :
1417 22 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1418 : ObjectIdGetDatum(lfirst_oid(lc)),
1419 : ObjectIdGetDatum(pubform->oid));
1420 :
1421 22 : if (!HeapTupleIsValid(coltuple))
1422 0 : continue;
1423 :
1424 : /*
1425 : * Disallow adding schema if column list is already part of the
1426 : * publication. See CheckPubRelationColumnList.
1427 : */
1428 22 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1429 6 : ereport(ERROR,
1430 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1431 : errmsg("cannot add schema to publication \"%s\"",
1432 : stmt->pubname),
1433 : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1434 :
1435 16 : ReleaseSysCache(coltuple);
1436 : }
1437 :
1438 32 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1439 : }
1440 442 : else if (stmt->action == AP_DropObjects)
1441 38 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1442 : else /* AP_SetObjects */
1443 : {
1444 404 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1445 404 : List *delschemas = NIL;
1446 :
1447 : /* Identify which schemas should be dropped */
1448 404 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1449 :
1450 : /*
1451 : * Schema lock is held until the publication is altered to prevent
1452 : * concurrent schema deletion.
1453 : */
1454 404 : LockSchemaList(delschemas);
1455 :
1456 : /* And drop them */
1457 404 : PublicationDropSchemas(pubform->oid, delschemas, true);
1458 :
1459 : /*
1460 : * Don't bother calculating the difference for adding, we'll catch and
1461 : * skip existing ones when doing catalog update.
1462 : */
1463 404 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1464 : }
1465 : }
1466 :
1467 : /*
1468 : * Check if relations and schemas can be in a given publication and throw
1469 : * appropriate error if not.
1470 : */
1471 : static void
1472 960 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1473 : List *tables, List *schemaidlist)
1474 : {
1475 960 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1476 :
1477 960 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1478 104 : schemaidlist && !superuser())
1479 6 : ereport(ERROR,
1480 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1481 : errmsg("must be superuser to add or set schemas")));
1482 :
1483 : /*
1484 : * Check that user is allowed to manipulate the publication tables in
1485 : * schema
1486 : */
1487 954 : if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
1488 : {
1489 18 : if (pubform->puballtables && pubform->puballsequences)
1490 0 : ereport(ERROR,
1491 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1492 : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1493 : NameStr(pubform->pubname)),
1494 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1495 18 : else if (pubform->puballtables)
1496 18 : ereport(ERROR,
1497 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1498 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1499 : NameStr(pubform->pubname)),
1500 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
1501 : else
1502 0 : ereport(ERROR,
1503 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1504 : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1505 : NameStr(pubform->pubname)),
1506 : errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1507 : }
1508 :
1509 : /* Check that user is allowed to manipulate the publication tables. */
1510 936 : if (tables && (pubform->puballtables || pubform->puballsequences))
1511 : {
1512 18 : if (pubform->puballtables && pubform->puballsequences)
1513 0 : ereport(ERROR,
1514 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1515 : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1516 : NameStr(pubform->pubname)),
1517 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1518 18 : else if (pubform->puballtables)
1519 18 : ereport(ERROR,
1520 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1521 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1522 : NameStr(pubform->pubname)),
1523 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
1524 : else
1525 0 : ereport(ERROR,
1526 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1527 : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1528 : NameStr(pubform->pubname)),
1529 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1530 : }
1531 918 : }
1532 :
1533 : /*
1534 : * Alter the existing publication.
1535 : *
1536 : * This is dispatcher function for AlterPublicationOptions,
1537 : * AlterPublicationSchemas and AlterPublicationTables.
1538 : */
1539 : void
1540 1106 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1541 : {
1542 : Relation rel;
1543 : HeapTuple tup;
1544 : Form_pg_publication pubform;
1545 :
1546 1106 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1547 :
1548 1106 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1549 : CStringGetDatum(stmt->pubname));
1550 :
1551 1106 : if (!HeapTupleIsValid(tup))
1552 0 : ereport(ERROR,
1553 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1554 : errmsg("publication \"%s\" does not exist",
1555 : stmt->pubname)));
1556 :
1557 1106 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1558 :
1559 : /* must be owner */
1560 1106 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1561 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1562 0 : stmt->pubname);
1563 :
1564 1106 : if (stmt->options)
1565 128 : AlterPublicationOptions(pstate, stmt, rel, tup);
1566 : else
1567 : {
1568 978 : List *relations = NIL;
1569 978 : List *schemaidlist = NIL;
1570 978 : Oid pubid = pubform->oid;
1571 :
1572 978 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1573 : &schemaidlist);
1574 :
1575 960 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1576 :
1577 918 : heap_freetuple(tup);
1578 :
1579 : /* Lock the publication so nobody else can do anything with it. */
1580 918 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1581 : AccessExclusiveLock);
1582 :
1583 : /*
1584 : * It is possible that by the time we acquire the lock on publication,
1585 : * concurrent DDL has removed it. We can test this by checking the
1586 : * existence of publication. We get the tuple again to avoid the risk
1587 : * of any publication option getting changed.
1588 : */
1589 918 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1590 918 : if (!HeapTupleIsValid(tup))
1591 0 : ereport(ERROR,
1592 : errcode(ERRCODE_UNDEFINED_OBJECT),
1593 : errmsg("publication \"%s\" does not exist",
1594 : stmt->pubname));
1595 :
1596 918 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1597 : schemaidlist != NIL);
1598 786 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1599 : }
1600 :
1601 : /* Cleanup. */
1602 884 : heap_freetuple(tup);
1603 884 : table_close(rel, RowExclusiveLock);
1604 884 : }
1605 :
1606 : /*
1607 : * Remove relation from publication by mapping OID.
1608 : */
1609 : void
1610 848 : RemovePublicationRelById(Oid proid)
1611 : {
1612 : Relation rel;
1613 : HeapTuple tup;
1614 : Form_pg_publication_rel pubrel;
1615 848 : List *relids = NIL;
1616 :
1617 848 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1618 :
1619 848 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1620 :
1621 848 : if (!HeapTupleIsValid(tup))
1622 0 : elog(ERROR, "cache lookup failed for publication table %u",
1623 : proid);
1624 :
1625 848 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1626 :
1627 : /*
1628 : * Invalidate relcache so that publication info is rebuilt.
1629 : *
1630 : * For the partitioned tables, we must invalidate all partitions contained
1631 : * in the respective partition hierarchies, not just the one explicitly
1632 : * mentioned in the publication. This is required because we implicitly
1633 : * publish the child tables when the parent table is published.
1634 : */
1635 848 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1636 : pubrel->prrelid);
1637 :
1638 848 : InvalidatePublicationRels(relids);
1639 :
1640 848 : CatalogTupleDelete(rel, &tup->t_self);
1641 :
1642 848 : ReleaseSysCache(tup);
1643 :
1644 848 : table_close(rel, RowExclusiveLock);
1645 848 : }
1646 :
1647 : /*
1648 : * Remove the publication by mapping OID.
1649 : */
1650 : void
1651 500 : RemovePublicationById(Oid pubid)
1652 : {
1653 : Relation rel;
1654 : HeapTuple tup;
1655 : Form_pg_publication pubform;
1656 :
1657 500 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1658 :
1659 500 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1660 500 : if (!HeapTupleIsValid(tup))
1661 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1662 :
1663 500 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1664 :
1665 : /* Invalidate relcache so that publication info is rebuilt. */
1666 500 : if (pubform->puballtables)
1667 70 : CacheInvalidateRelcacheAll();
1668 :
1669 500 : CatalogTupleDelete(rel, &tup->t_self);
1670 :
1671 500 : ReleaseSysCache(tup);
1672 :
1673 500 : table_close(rel, RowExclusiveLock);
1674 500 : }
1675 :
1676 : /*
1677 : * Remove schema from publication by mapping OID.
1678 : */
1679 : void
1680 192 : RemovePublicationSchemaById(Oid psoid)
1681 : {
1682 : Relation rel;
1683 : HeapTuple tup;
1684 192 : List *schemaRels = NIL;
1685 : Form_pg_publication_namespace pubsch;
1686 :
1687 192 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1688 :
1689 192 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1690 :
1691 192 : if (!HeapTupleIsValid(tup))
1692 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1693 :
1694 192 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1695 :
1696 : /*
1697 : * Invalidate relcache so that publication info is rebuilt. See
1698 : * RemovePublicationRelById for why we need to consider all the
1699 : * partitions.
1700 : */
1701 192 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1702 : PUBLICATION_PART_ALL);
1703 192 : InvalidatePublicationRels(schemaRels);
1704 :
1705 192 : CatalogTupleDelete(rel, &tup->t_self);
1706 :
1707 192 : ReleaseSysCache(tup);
1708 :
1709 192 : table_close(rel, RowExclusiveLock);
1710 192 : }
1711 :
1712 : /*
1713 : * Open relations specified by a PublicationTable list.
1714 : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1715 : * add them to a publication.
1716 : */
1717 : static List *
1718 1316 : OpenTableList(List *tables)
1719 : {
1720 1316 : List *relids = NIL;
1721 1316 : List *rels = NIL;
1722 : ListCell *lc;
1723 1316 : List *relids_with_rf = NIL;
1724 1316 : List *relids_with_collist = NIL;
1725 :
1726 : /*
1727 : * Open, share-lock, and check all the explicitly-specified relations
1728 : */
1729 2700 : foreach(lc, tables)
1730 : {
1731 1414 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1732 1414 : bool recurse = t->relation->inh;
1733 : Relation rel;
1734 : Oid myrelid;
1735 : PublicationRelInfo *pub_rel;
1736 :
1737 : /* Allow query cancel in case this takes a long time */
1738 1414 : CHECK_FOR_INTERRUPTS();
1739 :
1740 1414 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1741 1408 : myrelid = RelationGetRelid(rel);
1742 :
1743 : /*
1744 : * Filter out duplicates if user specifies "foo, foo".
1745 : *
1746 : * Note that this algorithm is known to not be very efficient (O(N^2))
1747 : * but given that it only works on list of tables given to us by user
1748 : * it's deemed acceptable.
1749 : */
1750 1408 : if (list_member_oid(relids, myrelid))
1751 : {
1752 : /* Disallow duplicate tables if there are any with row filters. */
1753 24 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1754 12 : ereport(ERROR,
1755 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1756 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1757 : RelationGetRelationName(rel))));
1758 :
1759 : /* Disallow duplicate tables if there are any with column lists. */
1760 12 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1761 12 : ereport(ERROR,
1762 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1763 : errmsg("conflicting or redundant column lists for table \"%s\"",
1764 : RelationGetRelationName(rel))));
1765 :
1766 0 : table_close(rel, ShareUpdateExclusiveLock);
1767 0 : continue;
1768 : }
1769 :
1770 1384 : pub_rel = palloc_object(PublicationRelInfo);
1771 1384 : pub_rel->relation = rel;
1772 1384 : pub_rel->whereClause = t->whereClause;
1773 1384 : pub_rel->columns = t->columns;
1774 1384 : rels = lappend(rels, pub_rel);
1775 1384 : relids = lappend_oid(relids, myrelid);
1776 :
1777 1384 : if (t->whereClause)
1778 404 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1779 :
1780 1384 : if (t->columns)
1781 410 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1782 :
1783 : /*
1784 : * Add children of this rel, if requested, so that they too are added
1785 : * to the publication. A partitioned table can't have any inheritance
1786 : * children other than its partitions, which need not be explicitly
1787 : * added to the publication.
1788 : */
1789 1384 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1790 : {
1791 : List *children;
1792 : ListCell *child;
1793 :
1794 1160 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1795 : NULL);
1796 :
1797 2328 : foreach(child, children)
1798 : {
1799 1168 : Oid childrelid = lfirst_oid(child);
1800 :
1801 : /* Allow query cancel in case this takes a long time */
1802 1168 : CHECK_FOR_INTERRUPTS();
1803 :
1804 : /*
1805 : * Skip duplicates if user specified both parent and child
1806 : * tables.
1807 : */
1808 1168 : if (list_member_oid(relids, childrelid))
1809 : {
1810 : /*
1811 : * We don't allow to specify row filter for both parent
1812 : * and child table at the same time as it is not very
1813 : * clear which one should be given preference.
1814 : */
1815 1160 : if (childrelid != myrelid &&
1816 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1817 0 : ereport(ERROR,
1818 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1819 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1820 : RelationGetRelationName(rel))));
1821 :
1822 : /*
1823 : * We don't allow to specify column list for both parent
1824 : * and child table at the same time as it is not very
1825 : * clear which one should be given preference.
1826 : */
1827 1160 : if (childrelid != myrelid &&
1828 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1829 0 : ereport(ERROR,
1830 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1831 : errmsg("conflicting or redundant column lists for table \"%s\"",
1832 : RelationGetRelationName(rel))));
1833 :
1834 1160 : continue;
1835 : }
1836 :
1837 : /* find_all_inheritors already got lock */
1838 8 : rel = table_open(childrelid, NoLock);
1839 8 : pub_rel = palloc_object(PublicationRelInfo);
1840 8 : pub_rel->relation = rel;
1841 : /* child inherits WHERE clause from parent */
1842 8 : pub_rel->whereClause = t->whereClause;
1843 :
1844 : /* child inherits column list from parent */
1845 8 : pub_rel->columns = t->columns;
1846 8 : rels = lappend(rels, pub_rel);
1847 8 : relids = lappend_oid(relids, childrelid);
1848 :
1849 8 : if (t->whereClause)
1850 2 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1851 :
1852 8 : if (t->columns)
1853 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1854 : }
1855 : }
1856 : }
1857 :
1858 1286 : list_free(relids);
1859 1286 : list_free(relids_with_rf);
1860 :
1861 1286 : return rels;
1862 : }
1863 :
1864 : /*
1865 : * Close all relations in the list.
1866 : */
1867 : static void
1868 1502 : CloseTableList(List *rels)
1869 : {
1870 : ListCell *lc;
1871 :
1872 3056 : foreach(lc, rels)
1873 : {
1874 : PublicationRelInfo *pub_rel;
1875 :
1876 1554 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1877 1554 : table_close(pub_rel->relation, NoLock);
1878 : }
1879 :
1880 1502 : list_free_deep(rels);
1881 1502 : }
1882 :
1883 : /*
1884 : * Lock the schemas specified in the schema list in AccessShareLock mode in
1885 : * order to prevent concurrent schema deletion.
1886 : */
1887 : static void
1888 1036 : LockSchemaList(List *schemalist)
1889 : {
1890 : ListCell *lc;
1891 :
1892 1360 : foreach(lc, schemalist)
1893 : {
1894 324 : Oid schemaid = lfirst_oid(lc);
1895 :
1896 : /* Allow query cancel in case this takes a long time */
1897 324 : CHECK_FOR_INTERRUPTS();
1898 324 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1899 :
1900 : /*
1901 : * It is possible that by the time we acquire the lock on schema,
1902 : * concurrent DDL has removed it. We can test this by checking the
1903 : * existence of schema.
1904 : */
1905 324 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1906 0 : ereport(ERROR,
1907 : errcode(ERRCODE_UNDEFINED_SCHEMA),
1908 : errmsg("schema with OID %u does not exist", schemaid));
1909 : }
1910 1036 : }
1911 :
1912 : /*
1913 : * Add listed tables to the publication.
1914 : */
1915 : static void
1916 1084 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1917 : AlterPublicationStmt *stmt)
1918 : {
1919 : ListCell *lc;
1920 :
1921 2176 : foreach(lc, rels)
1922 : {
1923 1160 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1924 1160 : Relation rel = pub_rel->relation;
1925 : ObjectAddress obj;
1926 :
1927 : /* Must be owner of the table or superuser. */
1928 1160 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1929 6 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1930 6 : RelationGetRelationName(rel));
1931 :
1932 1154 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1933 1092 : if (stmt)
1934 : {
1935 628 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1936 : (Node *) stmt);
1937 :
1938 628 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1939 : obj.objectId, 0);
1940 : }
1941 : }
1942 1016 : }
1943 :
1944 : /*
1945 : * Remove listed tables from the publication.
1946 : */
1947 : static void
1948 504 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1949 : {
1950 : ObjectAddress obj;
1951 : ListCell *lc;
1952 : Oid prid;
1953 :
1954 966 : foreach(lc, rels)
1955 : {
1956 480 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1957 480 : Relation rel = pubrel->relation;
1958 480 : Oid relid = RelationGetRelid(rel);
1959 :
1960 480 : if (pubrel->columns)
1961 0 : ereport(ERROR,
1962 : errcode(ERRCODE_SYNTAX_ERROR),
1963 : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1964 :
1965 480 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1966 : ObjectIdGetDatum(relid),
1967 : ObjectIdGetDatum(pubid));
1968 480 : if (!OidIsValid(prid))
1969 : {
1970 12 : if (missing_ok)
1971 0 : continue;
1972 :
1973 12 : ereport(ERROR,
1974 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1975 : errmsg("relation \"%s\" is not part of the publication",
1976 : RelationGetRelationName(rel))));
1977 : }
1978 :
1979 468 : if (pubrel->whereClause)
1980 6 : ereport(ERROR,
1981 : (errcode(ERRCODE_SYNTAX_ERROR),
1982 : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1983 :
1984 462 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1985 462 : performDeletion(&obj, DROP_CASCADE, 0);
1986 : }
1987 486 : }
1988 :
1989 : /*
1990 : * Add listed schemas to the publication.
1991 : */
1992 : static void
1993 588 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1994 : AlterPublicationStmt *stmt)
1995 : {
1996 : ListCell *lc;
1997 :
1998 838 : foreach(lc, schemas)
1999 : {
2000 262 : Oid schemaid = lfirst_oid(lc);
2001 : ObjectAddress obj;
2002 :
2003 262 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
2004 250 : if (stmt)
2005 : {
2006 68 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
2007 : (Node *) stmt);
2008 :
2009 68 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
2010 : obj.objectId, 0);
2011 : }
2012 : }
2013 576 : }
2014 :
2015 : /*
2016 : * Remove listed schemas from the publication.
2017 : */
2018 : static void
2019 442 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
2020 : {
2021 : ObjectAddress obj;
2022 : ListCell *lc;
2023 : Oid psid;
2024 :
2025 492 : foreach(lc, schemas)
2026 : {
2027 56 : Oid schemaid = lfirst_oid(lc);
2028 :
2029 56 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
2030 : Anum_pg_publication_namespace_oid,
2031 : ObjectIdGetDatum(schemaid),
2032 : ObjectIdGetDatum(pubid));
2033 56 : if (!OidIsValid(psid))
2034 : {
2035 6 : if (missing_ok)
2036 0 : continue;
2037 :
2038 6 : ereport(ERROR,
2039 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2040 : errmsg("tables from schema \"%s\" are not part of the publication",
2041 : get_namespace_name(schemaid))));
2042 : }
2043 :
2044 50 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
2045 50 : performDeletion(&obj, DROP_CASCADE, 0);
2046 : }
2047 436 : }
2048 :
2049 : /*
2050 : * Internal workhorse for changing a publication owner
2051 : */
2052 : static void
2053 36 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2054 : {
2055 : Form_pg_publication form;
2056 :
2057 36 : form = (Form_pg_publication) GETSTRUCT(tup);
2058 :
2059 36 : if (form->pubowner == newOwnerId)
2060 12 : return;
2061 :
2062 24 : if (!superuser())
2063 : {
2064 : AclResult aclresult;
2065 :
2066 : /* Must be owner */
2067 12 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2068 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2069 0 : NameStr(form->pubname));
2070 :
2071 : /* Must be able to become new owner */
2072 12 : check_can_set_role(GetUserId(), newOwnerId);
2073 :
2074 : /* New owner must have CREATE privilege on database */
2075 12 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2076 12 : if (aclresult != ACLCHECK_OK)
2077 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2078 0 : get_database_name(MyDatabaseId));
2079 :
2080 12 : if (!superuser_arg(newOwnerId))
2081 : {
2082 12 : if (form->puballtables || form->puballsequences ||
2083 6 : is_schema_publication(form->oid))
2084 6 : ereport(ERROR,
2085 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2086 : errmsg("permission denied to change owner of publication \"%s\"",
2087 : NameStr(form->pubname)),
2088 : errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
2089 : }
2090 : }
2091 :
2092 18 : form->pubowner = newOwnerId;
2093 18 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2094 :
2095 : /* Update owner dependency reference */
2096 18 : changeDependencyOnOwner(PublicationRelationId,
2097 : form->oid,
2098 : newOwnerId);
2099 :
2100 18 : InvokeObjectPostAlterHook(PublicationRelationId,
2101 : form->oid, 0);
2102 : }
2103 :
2104 : /*
2105 : * Change publication owner -- by name
2106 : */
2107 : ObjectAddress
2108 36 : AlterPublicationOwner(const char *name, Oid newOwnerId)
2109 : {
2110 : Oid pubid;
2111 : HeapTuple tup;
2112 : Relation rel;
2113 : ObjectAddress address;
2114 : Form_pg_publication pubform;
2115 :
2116 36 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2117 :
2118 36 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2119 :
2120 36 : if (!HeapTupleIsValid(tup))
2121 0 : ereport(ERROR,
2122 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2123 : errmsg("publication \"%s\" does not exist", name)));
2124 :
2125 36 : pubform = (Form_pg_publication) GETSTRUCT(tup);
2126 36 : pubid = pubform->oid;
2127 :
2128 36 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2129 :
2130 30 : ObjectAddressSet(address, PublicationRelationId, pubid);
2131 :
2132 30 : heap_freetuple(tup);
2133 :
2134 30 : table_close(rel, RowExclusiveLock);
2135 :
2136 30 : return address;
2137 : }
2138 :
2139 : /*
2140 : * Change publication owner -- by OID
2141 : */
2142 : void
2143 0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
2144 : {
2145 : HeapTuple tup;
2146 : Relation rel;
2147 :
2148 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2149 :
2150 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2151 :
2152 0 : if (!HeapTupleIsValid(tup))
2153 0 : ereport(ERROR,
2154 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2155 : errmsg("publication with OID %u does not exist", pubid)));
2156 :
2157 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2158 :
2159 0 : heap_freetuple(tup);
2160 :
2161 0 : table_close(rel, RowExclusiveLock);
2162 0 : }
2163 :
2164 : /*
2165 : * Extract the publish_generated_columns option value from a DefElem. "stored"
2166 : * and "none" values are accepted.
2167 : */
2168 : static char
2169 76 : defGetGeneratedColsOption(DefElem *def)
2170 : {
2171 76 : char *sval = "";
2172 :
2173 : /*
2174 : * A parameter value is required.
2175 : */
2176 76 : if (def->arg)
2177 : {
2178 70 : sval = defGetString(def);
2179 :
2180 70 : if (pg_strcasecmp(sval, "none") == 0)
2181 22 : return PUBLISH_GENCOLS_NONE;
2182 48 : if (pg_strcasecmp(sval, "stored") == 0)
2183 42 : return PUBLISH_GENCOLS_STORED;
2184 : }
2185 :
2186 12 : ereport(ERROR,
2187 : errcode(ERRCODE_SYNTAX_ERROR),
2188 : errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
2189 : errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
2190 :
2191 : return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2192 : }
|