Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/namespace.h"
23 : #include "catalog/objectaccess.h"
24 : #include "catalog/objectaddress.h"
25 : #include "catalog/pg_database.h"
26 : #include "catalog/pg_inherits.h"
27 : #include "catalog/pg_namespace.h"
28 : #include "catalog/pg_proc.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "commands/defrem.h"
33 : #include "commands/event_trigger.h"
34 : #include "commands/publicationcmds.h"
35 : #include "miscadmin.h"
36 : #include "nodes/nodeFuncs.h"
37 : #include "parser/parse_clause.h"
38 : #include "parser/parse_collate.h"
39 : #include "parser/parse_relation.h"
40 : #include "rewrite/rewriteHandler.h"
41 : #include "storage/lmgr.h"
42 : #include "utils/acl.h"
43 : #include "utils/builtins.h"
44 : #include "utils/inval.h"
45 : #include "utils/lsyscache.h"
46 : #include "utils/rel.h"
47 : #include "utils/syscache.h"
48 : #include "utils/varlena.h"
49 :
50 :
51 : /*
52 : * Information used to validate the columns in the row filter expression. See
53 : * contain_invalid_rfcolumn_walker for details.
54 : */
55 : typedef struct rf_context
56 : {
57 : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : bool pubviaroot; /* true if we are validating the parent
59 : * relation's row filter */
60 : Oid relid; /* relid of the relation */
61 : Oid parentid; /* relid of the parent relation */
62 : } rf_context;
63 :
64 : static List *OpenTableList(List *tables);
65 : static void CloseTableList(List *rels);
66 : static void LockSchemaList(List *schemalist);
67 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : AlterPublicationStmt *stmt);
69 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : AlterPublicationStmt *stmt);
72 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 : static char defGetGeneratedColsOption(DefElem *def);
74 :
75 :
76 : static void
77 506 : parse_publication_options(ParseState *pstate,
78 : List *options,
79 : bool *publish_given,
80 : PublicationActions *pubactions,
81 : bool *publish_via_partition_root_given,
82 : bool *publish_via_partition_root,
83 : bool *publish_generated_columns_given,
84 : char *publish_generated_columns)
85 : {
86 : ListCell *lc;
87 :
88 506 : *publish_given = false;
89 506 : *publish_via_partition_root_given = false;
90 506 : *publish_generated_columns_given = false;
91 :
92 : /* defaults */
93 506 : pubactions->pubinsert = true;
94 506 : pubactions->pubupdate = true;
95 506 : pubactions->pubdelete = true;
96 506 : pubactions->pubtruncate = true;
97 506 : *publish_via_partition_root = false;
98 506 : *publish_generated_columns = PUBLISH_GENCOLS_NONE;
99 :
100 : /* Parse options */
101 688 : foreach(lc, options)
102 : {
103 200 : DefElem *defel = (DefElem *) lfirst(lc);
104 :
105 200 : if (strcmp(defel->defname, "publish") == 0)
106 : {
107 : char *publish;
108 : List *publish_list;
109 : ListCell *lc2;
110 :
111 70 : if (*publish_given)
112 0 : errorConflictingDefElem(defel, pstate);
113 :
114 : /*
115 : * If publish option was given only the explicitly listed actions
116 : * should be published.
117 : */
118 70 : pubactions->pubinsert = false;
119 70 : pubactions->pubupdate = false;
120 70 : pubactions->pubdelete = false;
121 70 : pubactions->pubtruncate = false;
122 :
123 70 : *publish_given = true;
124 :
125 : /*
126 : * SplitIdentifierString destructively modifies its input, so make
127 : * a copy so we don't modify the memory of the executing statement
128 : */
129 70 : publish = pstrdup(defGetString(defel));
130 :
131 70 : if (!SplitIdentifierString(publish, ',', &publish_list))
132 0 : ereport(ERROR,
133 : (errcode(ERRCODE_SYNTAX_ERROR),
134 : errmsg("invalid list syntax in parameter \"%s\"",
135 : "publish")));
136 :
137 : /* Process the option list. */
138 167 : foreach(lc2, publish_list)
139 : {
140 100 : char *publish_opt = (char *) lfirst(lc2);
141 :
142 100 : if (strcmp(publish_opt, "insert") == 0)
143 62 : pubactions->pubinsert = true;
144 38 : else if (strcmp(publish_opt, "update") == 0)
145 15 : pubactions->pubupdate = true;
146 23 : else if (strcmp(publish_opt, "delete") == 0)
147 10 : pubactions->pubdelete = true;
148 13 : else if (strcmp(publish_opt, "truncate") == 0)
149 10 : pubactions->pubtruncate = true;
150 : else
151 3 : ereport(ERROR,
152 : (errcode(ERRCODE_SYNTAX_ERROR),
153 : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
154 : "publish", publish_opt)));
155 : }
156 : }
157 130 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
158 : {
159 86 : if (*publish_via_partition_root_given)
160 3 : errorConflictingDefElem(defel, pstate);
161 83 : *publish_via_partition_root_given = true;
162 83 : *publish_via_partition_root = defGetBoolean(defel);
163 : }
164 44 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
165 : {
166 41 : if (*publish_generated_columns_given)
167 3 : errorConflictingDefElem(defel, pstate);
168 38 : *publish_generated_columns_given = true;
169 38 : *publish_generated_columns = defGetGeneratedColsOption(defel);
170 : }
171 : else
172 3 : ereport(ERROR,
173 : (errcode(ERRCODE_SYNTAX_ERROR),
174 : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
175 : }
176 488 : }
177 :
178 : /*
179 : * Convert the PublicationObjSpecType list into schema oid list and
180 : * PublicationTable list.
181 : */
182 : static void
183 850 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
184 : List **rels, List **schemas)
185 : {
186 : ListCell *cell;
187 : PublicationObjSpec *pubobj;
188 :
189 850 : if (!pubobjspec_list)
190 52 : return;
191 :
192 1694 : foreach(cell, pubobjspec_list)
193 : {
194 : Oid schemaid;
195 : List *search_path;
196 :
197 914 : pubobj = (PublicationObjSpec *) lfirst(cell);
198 :
199 914 : switch (pubobj->pubobjtype)
200 : {
201 716 : case PUBLICATIONOBJ_TABLE:
202 716 : *rels = lappend(*rels, pubobj->pubtable);
203 716 : break;
204 186 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
205 186 : schemaid = get_namespace_oid(pubobj->name, false);
206 :
207 : /* Filter out duplicates if user specifies "sch1, sch1" */
208 171 : *schemas = list_append_unique_oid(*schemas, schemaid);
209 171 : break;
210 12 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
211 12 : search_path = fetch_search_path(false);
212 12 : if (search_path == NIL) /* nothing valid in search_path? */
213 3 : ereport(ERROR,
214 : errcode(ERRCODE_UNDEFINED_SCHEMA),
215 : errmsg("no schema has been selected for CURRENT_SCHEMA"));
216 :
217 9 : schemaid = linitial_oid(search_path);
218 9 : list_free(search_path);
219 :
220 : /* Filter out duplicates if user specifies "sch1, sch1" */
221 9 : *schemas = list_append_unique_oid(*schemas, schemaid);
222 9 : break;
223 0 : default:
224 : /* shouldn't happen */
225 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
226 : break;
227 : }
228 : }
229 : }
230 :
231 : /*
232 : * Returns true if any of the columns used in the row filter WHERE expression is
233 : * not part of REPLICA IDENTITY, false otherwise.
234 : */
235 : static bool
236 129 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
237 : {
238 129 : if (node == NULL)
239 0 : return false;
240 :
241 129 : if (IsA(node, Var))
242 : {
243 52 : Var *var = (Var *) node;
244 52 : AttrNumber attnum = var->varattno;
245 :
246 : /*
247 : * If pubviaroot is true, we are validating the row filter of the
248 : * parent table, but the bitmap contains the replica identity
249 : * information of the child table. So, get the column number of the
250 : * child table as parent and child column order could be different.
251 : */
252 52 : if (context->pubviaroot)
253 : {
254 8 : char *colname = get_attname(context->parentid, attnum, false);
255 :
256 8 : attnum = get_attnum(context->relid, colname);
257 : }
258 :
259 52 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
260 52 : context->bms_replident))
261 30 : return true;
262 : }
263 :
264 99 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
265 : context);
266 : }
267 :
268 : /*
269 : * Check if all columns referenced in the filter expression are part of the
270 : * REPLICA IDENTITY index or not.
271 : *
272 : * Returns true if any invalid column is found.
273 : */
274 : bool
275 363 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
276 : bool pubviaroot)
277 : {
278 : HeapTuple rftuple;
279 363 : Oid relid = RelationGetRelid(relation);
280 363 : Oid publish_as_relid = RelationGetRelid(relation);
281 363 : bool result = false;
282 : Datum rfdatum;
283 : bool rfisnull;
284 :
285 : /*
286 : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
287 : * allowed in the row filter and we can skip the validation.
288 : */
289 363 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
290 99 : return false;
291 :
292 : /*
293 : * For a partition, if pubviaroot is true, find the topmost ancestor that
294 : * is published via this publication as we need to use its row filter
295 : * expression to filter the partition's changes.
296 : *
297 : * Note that even though the row filter used is for an ancestor, the
298 : * REPLICA IDENTITY used will be for the actual child table.
299 : */
300 264 : if (pubviaroot && relation->rd_rel->relispartition)
301 : {
302 : publish_as_relid
303 54 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
304 :
305 54 : if (!OidIsValid(publish_as_relid))
306 3 : publish_as_relid = relid;
307 : }
308 :
309 264 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
310 : ObjectIdGetDatum(publish_as_relid),
311 : ObjectIdGetDatum(pubid));
312 :
313 264 : if (!HeapTupleIsValid(rftuple))
314 28 : return false;
315 :
316 236 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
317 : Anum_pg_publication_rel_prqual,
318 : &rfisnull);
319 :
320 236 : if (!rfisnull)
321 : {
322 51 : rf_context context = {0};
323 : Node *rfnode;
324 51 : Bitmapset *bms = NULL;
325 :
326 51 : context.pubviaroot = pubviaroot;
327 51 : context.parentid = publish_as_relid;
328 51 : context.relid = relid;
329 :
330 : /* Remember columns that are part of the REPLICA IDENTITY */
331 51 : bms = RelationGetIndexAttrBitmap(relation,
332 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
333 :
334 51 : context.bms_replident = bms;
335 51 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
336 51 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
337 : }
338 :
339 236 : ReleaseSysCache(rftuple);
340 :
341 236 : return result;
342 : }
343 :
344 : /*
345 : * Check for invalid columns in the publication table definition.
346 : *
347 : * This function evaluates two conditions:
348 : *
349 : * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
350 : * by the column list. If any column is missing, *invalid_column_list is set
351 : * to true.
352 : * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
353 : * are published, either by being explicitly named in the column list or, if
354 : * no column list is specified, by setting the option
355 : * publish_generated_columns to stored. If any unpublished
356 : * generated column is found, *invalid_gen_col is set to true.
357 : *
358 : * Returns true if any of the above conditions are not met.
359 : */
360 : bool
361 471 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
362 : bool pubviaroot, char pubgencols_type,
363 : bool *invalid_column_list,
364 : bool *invalid_gen_col)
365 : {
366 471 : Oid relid = RelationGetRelid(relation);
367 471 : Oid publish_as_relid = RelationGetRelid(relation);
368 : Bitmapset *idattrs;
369 471 : Bitmapset *columns = NULL;
370 471 : TupleDesc desc = RelationGetDescr(relation);
371 : Publication *pub;
372 : int x;
373 :
374 471 : *invalid_column_list = false;
375 471 : *invalid_gen_col = false;
376 :
377 : /*
378 : * For a partition, if pubviaroot is true, find the topmost ancestor that
379 : * is published via this publication as we need to use its column list for
380 : * the changes.
381 : *
382 : * Note that even though the column list used is for an ancestor, the
383 : * REPLICA IDENTITY used will be for the actual child table.
384 : */
385 471 : if (pubviaroot && relation->rd_rel->relispartition)
386 : {
387 80 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
388 :
389 80 : if (!OidIsValid(publish_as_relid))
390 18 : publish_as_relid = relid;
391 : }
392 :
393 : /* Fetch the column list */
394 471 : pub = GetPublication(pubid);
395 471 : check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
396 :
397 471 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
398 : {
399 : /* With REPLICA IDENTITY FULL, no column list is allowed. */
400 106 : *invalid_column_list = (columns != NULL);
401 :
402 : /*
403 : * As we don't allow a column list with REPLICA IDENTITY FULL, the
404 : * publish_generated_columns option must be set to stored if the table
405 : * has any stored generated columns.
406 : */
407 106 : if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
408 100 : relation->rd_att->constr &&
409 44 : relation->rd_att->constr->has_generated_stored)
410 3 : *invalid_gen_col = true;
411 :
412 : /*
413 : * Virtual generated columns are currently not supported for logical
414 : * replication at all.
415 : */
416 106 : if (relation->rd_att->constr &&
417 50 : relation->rd_att->constr->has_generated_virtual)
418 6 : *invalid_gen_col = true;
419 :
420 106 : if (*invalid_gen_col && *invalid_column_list)
421 0 : return true;
422 : }
423 :
424 : /* Remember columns that are part of the REPLICA IDENTITY */
425 471 : idattrs = RelationGetIndexAttrBitmap(relation,
426 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
427 :
428 : /*
429 : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
430 : * (to handle system columns the usual way), while column list does not
431 : * use offset, so we can't do bms_is_subset(). Instead, we have to loop
432 : * over the idattrs and check all of them are in the list.
433 : */
434 471 : x = -1;
435 799 : while ((x = bms_next_member(idattrs, x)) >= 0)
436 : {
437 331 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
438 331 : Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
439 :
440 331 : if (columns == NULL)
441 : {
442 : /*
443 : * The publish_generated_columns option must be set to stored if
444 : * the REPLICA IDENTITY contains any stored generated column.
445 : */
446 226 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
447 : {
448 3 : *invalid_gen_col = true;
449 3 : break;
450 : }
451 :
452 : /*
453 : * The equivalent setting for virtual generated columns does not
454 : * exist yet.
455 : */
456 223 : if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
457 : {
458 0 : *invalid_gen_col = true;
459 0 : break;
460 : }
461 :
462 : /* Skip validating the column list since it is not defined */
463 223 : continue;
464 : }
465 :
466 : /*
467 : * If pubviaroot is true, we are validating the column list of the
468 : * parent table, but the bitmap contains the replica identity
469 : * information of the child table. The parent/child attnums may not
470 : * match, so translate them to the parent - get the attname from the
471 : * child, and look it up in the parent.
472 : */
473 105 : if (pubviaroot)
474 : {
475 : /* attribute name in the child table */
476 40 : char *colname = get_attname(relid, attnum, false);
477 :
478 : /*
479 : * Determine the attnum for the attribute name in parent (we are
480 : * using the column list defined on the parent).
481 : */
482 40 : attnum = get_attnum(publish_as_relid, colname);
483 : }
484 :
485 : /* replica identity column, not covered by the column list */
486 105 : *invalid_column_list |= !bms_is_member(attnum, columns);
487 :
488 105 : if (*invalid_column_list && *invalid_gen_col)
489 0 : break;
490 : }
491 :
492 471 : bms_free(columns);
493 471 : bms_free(idattrs);
494 :
495 471 : return *invalid_column_list || *invalid_gen_col;
496 : }
497 :
498 : /*
499 : * Invalidate entries in the RelationSyncCache for relations included in the
500 : * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
501 : *
502 : * If 'puballtables' is true, invalidate all cache entries.
503 : */
504 : void
505 18 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
506 : {
507 18 : if (puballtables)
508 : {
509 3 : CacheInvalidateRelSyncAll();
510 : }
511 : else
512 : {
513 15 : List *relids = NIL;
514 15 : List *schemarelids = NIL;
515 :
516 : /*
517 : * For partitioned tables, we must invalidate all partitions and
518 : * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
519 : * a target. However, WAL records for TRUNCATE specify both a root and
520 : * its leaves.
521 : */
522 15 : relids = GetPublicationRelations(pubid,
523 : PUBLICATION_PART_ALL);
524 15 : schemarelids = GetAllSchemaPublicationRelations(pubid,
525 : PUBLICATION_PART_ALL);
526 :
527 15 : relids = list_concat_unique_oid(relids, schemarelids);
528 :
529 : /* Invalidate the relsyncache */
530 33 : foreach_oid(relid, relids)
531 3 : CacheInvalidateRelSync(relid);
532 : }
533 :
534 18 : return;
535 : }
536 :
537 : /* check_functions_in_node callback */
538 : static bool
539 218 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
540 : {
541 218 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
542 : func_id >= FirstNormalObjectId);
543 : }
544 :
545 : /*
546 : * The row filter walker checks if the row filter expression is a "simple
547 : * expression".
548 : *
549 : * It allows only simple or compound expressions such as:
550 : * - (Var Op Const)
551 : * - (Var Op Var)
552 : * - (Var Op Const) AND/OR (Var Op Const)
553 : * - etc
554 : * (where Var is a column of the table this filter belongs to)
555 : *
556 : * The simple expression has the following restrictions:
557 : * - User-defined operators are not allowed;
558 : * - User-defined functions are not allowed;
559 : * - User-defined types are not allowed;
560 : * - User-defined collations are not allowed;
561 : * - Non-immutable built-in functions are not allowed;
562 : * - System columns are not allowed.
563 : *
564 : * NOTES
565 : *
566 : * We don't allow user-defined functions/operators/types/collations because
567 : * (a) if a user drops a user-defined object used in a row filter expression or
568 : * if there is any other error while using it, the logical decoding
569 : * infrastructure won't be able to recover from such an error even if the
570 : * object is recreated again because a historic snapshot is used to evaluate
571 : * the row filter;
572 : * (b) a user-defined function can be used to access tables that could have
573 : * unpleasant results because a historic snapshot is used. That's why only
574 : * immutable built-in functions are allowed in row filter expressions.
575 : *
576 : * We don't allow system columns because currently, we don't have that
577 : * information in the tuple passed to downstream. Also, as we don't replicate
578 : * those to subscribers, there doesn't seem to be a need for a filter on those
579 : * columns.
580 : *
581 : * We can allow other node types after more analysis and testing.
582 : */
583 : static bool
584 720 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
585 : {
586 720 : char *errdetail_msg = NULL;
587 :
588 720 : if (node == NULL)
589 3 : return false;
590 :
591 717 : switch (nodeTag(node))
592 : {
593 194 : case T_Var:
594 : /* System columns are not allowed. */
595 194 : if (((Var *) node)->varattno < InvalidAttrNumber)
596 3 : errdetail_msg = _("System columns are not allowed.");
597 194 : break;
598 196 : case T_OpExpr:
599 : case T_DistinctExpr:
600 : case T_NullIfExpr:
601 : /* OK, except user-defined operators are not allowed. */
602 196 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
603 3 : errdetail_msg = _("User-defined operators are not allowed.");
604 196 : break;
605 3 : case T_ScalarArrayOpExpr:
606 : /* OK, except user-defined operators are not allowed. */
607 3 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
608 0 : errdetail_msg = _("User-defined operators are not allowed.");
609 :
610 : /*
611 : * We don't need to check the hashfuncid and negfuncid of
612 : * ScalarArrayOpExpr as those functions are only built for a
613 : * subquery.
614 : */
615 3 : break;
616 3 : case T_RowCompareExpr:
617 : {
618 : ListCell *opid;
619 :
620 : /* OK, except user-defined operators are not allowed. */
621 9 : foreach(opid, ((RowCompareExpr *) node)->opnos)
622 : {
623 6 : if (lfirst_oid(opid) >= FirstNormalObjectId)
624 : {
625 0 : errdetail_msg = _("User-defined operators are not allowed.");
626 0 : break;
627 : }
628 : }
629 : }
630 3 : break;
631 318 : case T_Const:
632 : case T_FuncExpr:
633 : case T_BoolExpr:
634 : case T_RelabelType:
635 : case T_CollateExpr:
636 : case T_CaseExpr:
637 : case T_CaseTestExpr:
638 : case T_ArrayExpr:
639 : case T_RowExpr:
640 : case T_CoalesceExpr:
641 : case T_MinMaxExpr:
642 : case T_XmlExpr:
643 : case T_NullTest:
644 : case T_BooleanTest:
645 : case T_List:
646 : /* OK, supported */
647 318 : break;
648 3 : default:
649 3 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
650 3 : break;
651 : }
652 :
653 : /*
654 : * For all the supported nodes, if we haven't already found a problem,
655 : * check the types, functions, and collations used in it. We check List
656 : * by walking through each element.
657 : */
658 717 : if (!errdetail_msg && !IsA(node, List))
659 : {
660 681 : if (exprType(node) >= FirstNormalObjectId)
661 3 : errdetail_msg = _("User-defined types are not allowed.");
662 678 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
663 : pstate))
664 6 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
665 1344 : else if (exprCollation(node) >= FirstNormalObjectId ||
666 672 : exprInputCollation(node) >= FirstNormalObjectId)
667 3 : errdetail_msg = _("User-defined collations are not allowed.");
668 : }
669 :
670 : /*
671 : * If we found a problem in this node, throw error now. Otherwise keep
672 : * going.
673 : */
674 717 : if (errdetail_msg)
675 21 : ereport(ERROR,
676 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
677 : errmsg("invalid publication WHERE expression"),
678 : errdetail_internal("%s", errdetail_msg),
679 : parser_errposition(pstate, exprLocation(node))));
680 :
681 696 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
682 : pstate);
683 : }
684 :
685 : /*
686 : * Check if the row filter expression is a "simple expression".
687 : *
688 : * See check_simple_rowfilter_expr_walker for details.
689 : */
690 : static bool
691 188 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
692 : {
693 188 : return check_simple_rowfilter_expr_walker(node, pstate);
694 : }
695 :
696 : /*
697 : * Transform the publication WHERE expression for all the relations in the list,
698 : * ensuring it is coerced to boolean and necessary collation information is
699 : * added if required, and add a new nsitem/RTE for the associated relation to
700 : * the ParseState's namespace list.
701 : *
702 : * Also check the publication row filter expression and throw an error if
703 : * anything not permitted or unexpected is encountered.
704 : */
705 : static void
706 593 : TransformPubWhereClauses(List *tables, const char *queryString,
707 : bool pubviaroot)
708 : {
709 : ListCell *lc;
710 :
711 1194 : foreach(lc, tables)
712 : {
713 : ParseNamespaceItem *nsitem;
714 631 : Node *whereclause = NULL;
715 : ParseState *pstate;
716 631 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
717 :
718 631 : if (pri->whereClause == NULL)
719 434 : continue;
720 :
721 : /*
722 : * If the publication doesn't publish changes via the root partitioned
723 : * table, the partition's row filter will be used. So disallow using
724 : * WHERE clause on partitioned table in this case.
725 : */
726 197 : if (!pubviaroot &&
727 182 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
728 3 : ereport(ERROR,
729 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
730 : errmsg("cannot use publication WHERE clause for relation \"%s\"",
731 : RelationGetRelationName(pri->relation)),
732 : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
733 : "publish_via_partition_root")));
734 :
735 : /*
736 : * A fresh pstate is required so that we only have "this" table in its
737 : * rangetable
738 : */
739 194 : pstate = make_parsestate(NULL);
740 194 : pstate->p_sourcetext = queryString;
741 194 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
742 : AccessShareLock, NULL,
743 : false, false);
744 194 : addNSItemToQuery(pstate, nsitem, false, true, true);
745 :
746 194 : whereclause = transformWhereClause(pstate,
747 194 : copyObject(pri->whereClause),
748 : EXPR_KIND_WHERE,
749 : "PUBLICATION WHERE");
750 :
751 : /* Fix up collation information */
752 188 : assign_expr_collations(pstate, whereclause);
753 :
754 188 : whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
755 :
756 : /*
757 : * We allow only simple expressions in row filters. See
758 : * check_simple_rowfilter_expr_walker.
759 : */
760 188 : check_simple_rowfilter_expr(whereclause, pstate);
761 :
762 167 : free_parsestate(pstate);
763 :
764 167 : pri->whereClause = whereclause;
765 : }
766 563 : }
767 :
768 :
769 : /*
770 : * Given a list of tables that are going to be added to a publication,
771 : * verify that they fulfill the necessary preconditions, namely: no tables
772 : * have a column list if any schema is published; and partitioned tables do
773 : * not have column lists if publish_via_partition_root is not set.
774 : *
775 : * 'publish_schema' indicates that the publication contains any TABLES IN
776 : * SCHEMA elements (newly added in this command, or preexisting).
777 : * 'pubviaroot' is the value of publish_via_partition_root.
778 : */
779 : static void
780 563 : CheckPubRelationColumnList(char *pubname, List *tables,
781 : bool publish_schema, bool pubviaroot)
782 : {
783 : ListCell *lc;
784 :
785 1149 : foreach(lc, tables)
786 : {
787 601 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
788 :
789 601 : if (pri->columns == NIL)
790 399 : continue;
791 :
792 : /*
793 : * Disallow specifying column list if any schema is in the
794 : * publication.
795 : *
796 : * XXX We could instead just forbid the case when the publication
797 : * tries to publish the table with a column list and a schema for that
798 : * table. However, if we do that then we need a restriction during
799 : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
800 : * seem to be a good idea.
801 : */
802 202 : if (publish_schema)
803 12 : ereport(ERROR,
804 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
805 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
806 : get_namespace_name(RelationGetNamespace(pri->relation)),
807 : RelationGetRelationName(pri->relation), pubname),
808 : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
809 :
810 : /*
811 : * If the publication doesn't publish changes via the root partitioned
812 : * table, the partition's column list will be used. So disallow using
813 : * a column list on the partitioned table in this case.
814 : */
815 190 : if (!pubviaroot &&
816 152 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
817 3 : ereport(ERROR,
818 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
819 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
820 : get_namespace_name(RelationGetNamespace(pri->relation)),
821 : RelationGetRelationName(pri->relation), pubname),
822 : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
823 : "publish_via_partition_root")));
824 : }
825 548 : }
826 :
827 : /*
828 : * Create new publication.
829 : */
830 : ObjectAddress
831 448 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
832 : {
833 : Relation rel;
834 : ObjectAddress myself;
835 : Oid puboid;
836 : bool nulls[Natts_pg_publication];
837 : Datum values[Natts_pg_publication];
838 : HeapTuple tup;
839 : bool publish_given;
840 : PublicationActions pubactions;
841 : bool publish_via_partition_root_given;
842 : bool publish_via_partition_root;
843 : bool publish_generated_columns_given;
844 : char publish_generated_columns;
845 : AclResult aclresult;
846 448 : List *relations = NIL;
847 448 : List *schemaidlist = NIL;
848 :
849 : /* must have CREATE privilege on database */
850 448 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
851 448 : if (aclresult != ACLCHECK_OK)
852 3 : aclcheck_error(aclresult, OBJECT_DATABASE,
853 3 : get_database_name(MyDatabaseId));
854 :
855 : /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
856 445 : if (!superuser())
857 : {
858 12 : if (stmt->for_all_tables || stmt->for_all_sequences)
859 0 : ereport(ERROR,
860 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
861 : errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
862 : }
863 :
864 445 : rel = table_open(PublicationRelationId, RowExclusiveLock);
865 :
866 : /* Check if name is used */
867 445 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
868 : CStringGetDatum(stmt->pubname));
869 445 : if (OidIsValid(puboid))
870 3 : ereport(ERROR,
871 : (errcode(ERRCODE_DUPLICATE_OBJECT),
872 : errmsg("publication \"%s\" already exists",
873 : stmt->pubname)));
874 :
875 : /* Form a tuple. */
876 442 : memset(values, 0, sizeof(values));
877 442 : memset(nulls, false, sizeof(nulls));
878 :
879 442 : values[Anum_pg_publication_pubname - 1] =
880 442 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
881 442 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
882 :
883 442 : parse_publication_options(pstate,
884 : stmt->options,
885 : &publish_given, &pubactions,
886 : &publish_via_partition_root_given,
887 : &publish_via_partition_root,
888 : &publish_generated_columns_given,
889 : &publish_generated_columns);
890 :
891 424 : if (stmt->for_all_sequences &&
892 14 : (publish_given || publish_via_partition_root_given ||
893 : publish_generated_columns_given))
894 1 : ereport(NOTICE,
895 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
896 : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
897 :
898 424 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
899 : Anum_pg_publication_oid);
900 424 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
901 424 : values[Anum_pg_publication_puballtables - 1] =
902 424 : BoolGetDatum(stmt->for_all_tables);
903 424 : values[Anum_pg_publication_puballsequences - 1] =
904 424 : BoolGetDatum(stmt->for_all_sequences);
905 424 : values[Anum_pg_publication_pubinsert - 1] =
906 424 : BoolGetDatum(pubactions.pubinsert);
907 424 : values[Anum_pg_publication_pubupdate - 1] =
908 424 : BoolGetDatum(pubactions.pubupdate);
909 424 : values[Anum_pg_publication_pubdelete - 1] =
910 424 : BoolGetDatum(pubactions.pubdelete);
911 424 : values[Anum_pg_publication_pubtruncate - 1] =
912 424 : BoolGetDatum(pubactions.pubtruncate);
913 424 : values[Anum_pg_publication_pubviaroot - 1] =
914 424 : BoolGetDatum(publish_via_partition_root);
915 424 : values[Anum_pg_publication_pubgencols - 1] =
916 424 : CharGetDatum(publish_generated_columns);
917 :
918 424 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
919 :
920 : /* Insert tuple into catalog. */
921 424 : CatalogTupleInsert(rel, tup);
922 424 : heap_freetuple(tup);
923 :
924 424 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
925 :
926 424 : ObjectAddressSet(myself, PublicationRelationId, puboid);
927 :
928 : /* Make the changes visible. */
929 424 : CommandCounterIncrement();
930 :
931 : /* Associate objects with the publication. */
932 424 : if (stmt->for_all_tables)
933 : {
934 : /*
935 : * Invalidate relcache so that publication info is rebuilt. Sequences
936 : * publication doesn't require invalidation, as replica identity
937 : * checks don't apply to them.
938 : */
939 53 : CacheInvalidateRelcacheAll();
940 : }
941 371 : else if (!stmt->for_all_sequences)
942 : {
943 361 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
944 : &schemaidlist);
945 :
946 : /* FOR TABLES IN SCHEMA requires superuser */
947 352 : if (schemaidlist != NIL && !superuser())
948 3 : ereport(ERROR,
949 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
950 : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
951 :
952 349 : if (relations != NIL)
953 : {
954 : List *rels;
955 :
956 237 : rels = OpenTableList(relations);
957 222 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
958 : publish_via_partition_root);
959 :
960 210 : CheckPubRelationColumnList(stmt->pubname, rels,
961 : schemaidlist != NIL,
962 : publish_via_partition_root);
963 :
964 207 : PublicationAddTables(puboid, rels, true, NULL);
965 194 : CloseTableList(rels);
966 : }
967 :
968 306 : if (schemaidlist != NIL)
969 : {
970 : /*
971 : * Schema lock is held until the publication is created to prevent
972 : * concurrent schema deletion.
973 : */
974 76 : LockSchemaList(schemaidlist);
975 76 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
976 : }
977 : }
978 :
979 366 : table_close(rel, RowExclusiveLock);
980 :
981 366 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
982 :
983 : /*
984 : * We don't need this warning message when wal_level >= 'replica' since
985 : * logical decoding is automatically enabled up on a logical slot
986 : * creation.
987 : */
988 366 : if (wal_level < WAL_LEVEL_REPLICA)
989 13 : ereport(WARNING,
990 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
991 : errmsg("logical decoding must be enabled to publish logical changes"),
992 : errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
993 :
994 366 : return myself;
995 : }
996 :
997 : /*
998 : * Change options of a publication.
999 : */
1000 : static void
1001 64 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
1002 : Relation rel, HeapTuple tup)
1003 : {
1004 : bool nulls[Natts_pg_publication];
1005 : bool replaces[Natts_pg_publication];
1006 : Datum values[Natts_pg_publication];
1007 : bool publish_given;
1008 : PublicationActions pubactions;
1009 : bool publish_via_partition_root_given;
1010 : bool publish_via_partition_root;
1011 : bool publish_generated_columns_given;
1012 : char publish_generated_columns;
1013 : ObjectAddress obj;
1014 : Form_pg_publication pubform;
1015 64 : List *root_relids = NIL;
1016 : ListCell *lc;
1017 :
1018 64 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1019 :
1020 64 : parse_publication_options(pstate,
1021 : stmt->options,
1022 : &publish_given, &pubactions,
1023 : &publish_via_partition_root_given,
1024 : &publish_via_partition_root,
1025 : &publish_generated_columns_given,
1026 : &publish_generated_columns);
1027 :
1028 64 : if (pubform->puballsequences &&
1029 6 : (publish_given || publish_via_partition_root_given ||
1030 : publish_generated_columns_given))
1031 6 : ereport(NOTICE,
1032 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1033 : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
1034 :
1035 : /*
1036 : * If the publication doesn't publish changes via the root partitioned
1037 : * table, the partition's row filter and column list will be used. So
1038 : * disallow using WHERE clause and column lists on partitioned table in
1039 : * this case.
1040 : */
1041 64 : if (!pubform->puballtables && publish_via_partition_root_given &&
1042 40 : !publish_via_partition_root)
1043 : {
1044 : /*
1045 : * Lock the publication so nobody else can do anything with it. This
1046 : * prevents concurrent alter to add partitioned table(s) with WHERE
1047 : * clause(s) and/or column lists which we don't allow when not
1048 : * publishing via root.
1049 : */
1050 24 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1051 : AccessShareLock);
1052 :
1053 24 : root_relids = GetPublicationRelations(pubform->oid,
1054 : PUBLICATION_PART_ROOT);
1055 :
1056 42 : foreach(lc, root_relids)
1057 : {
1058 24 : Oid relid = lfirst_oid(lc);
1059 : HeapTuple rftuple;
1060 : char relkind;
1061 : char *relname;
1062 : bool has_rowfilter;
1063 : bool has_collist;
1064 :
1065 : /*
1066 : * Beware: we don't have lock on the relations, so cope silently
1067 : * with the cache lookups returning NULL.
1068 : */
1069 :
1070 24 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1071 : ObjectIdGetDatum(relid),
1072 : ObjectIdGetDatum(pubform->oid));
1073 24 : if (!HeapTupleIsValid(rftuple))
1074 0 : continue;
1075 24 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1076 24 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1077 24 : if (!has_rowfilter && !has_collist)
1078 : {
1079 6 : ReleaseSysCache(rftuple);
1080 6 : continue;
1081 : }
1082 :
1083 18 : relkind = get_rel_relkind(relid);
1084 18 : if (relkind != RELKIND_PARTITIONED_TABLE)
1085 : {
1086 12 : ReleaseSysCache(rftuple);
1087 12 : continue;
1088 : }
1089 6 : relname = get_rel_name(relid);
1090 6 : if (relname == NULL) /* table concurrently dropped */
1091 : {
1092 0 : ReleaseSysCache(rftuple);
1093 0 : continue;
1094 : }
1095 :
1096 6 : if (has_rowfilter)
1097 3 : ereport(ERROR,
1098 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1099 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1100 : "publish_via_partition_root",
1101 : stmt->pubname),
1102 : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1103 : relname, "publish_via_partition_root")));
1104 : Assert(has_collist);
1105 3 : ereport(ERROR,
1106 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1107 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1108 : "publish_via_partition_root",
1109 : stmt->pubname),
1110 : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1111 : relname, "publish_via_partition_root")));
1112 : }
1113 : }
1114 :
1115 : /* Everything ok, form a new tuple. */
1116 58 : memset(values, 0, sizeof(values));
1117 58 : memset(nulls, false, sizeof(nulls));
1118 58 : memset(replaces, false, sizeof(replaces));
1119 :
1120 58 : if (publish_given)
1121 : {
1122 17 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1123 17 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1124 :
1125 17 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1126 17 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1127 :
1128 17 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1129 17 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1130 :
1131 17 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1132 17 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1133 : }
1134 :
1135 58 : if (publish_via_partition_root_given)
1136 : {
1137 35 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1138 35 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1139 : }
1140 :
1141 58 : if (publish_generated_columns_given)
1142 : {
1143 6 : values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1144 6 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1145 : }
1146 :
1147 58 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1148 : replaces);
1149 :
1150 : /* Update the catalog. */
1151 58 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1152 :
1153 58 : CommandCounterIncrement();
1154 :
1155 58 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1156 :
1157 : /* Invalidate the relcache. */
1158 58 : if (pubform->puballtables)
1159 : {
1160 10 : CacheInvalidateRelcacheAll();
1161 : }
1162 : else
1163 : {
1164 48 : List *relids = NIL;
1165 48 : List *schemarelids = NIL;
1166 :
1167 : /*
1168 : * For any partitioned tables contained in the publication, we must
1169 : * invalidate all partitions contained in the respective partition
1170 : * trees, not just those explicitly mentioned in the publication.
1171 : */
1172 48 : if (root_relids == NIL)
1173 30 : relids = GetPublicationRelations(pubform->oid,
1174 : PUBLICATION_PART_ALL);
1175 : else
1176 : {
1177 : /*
1178 : * We already got tables explicitly mentioned in the publication.
1179 : * Now get all partitions for the partitioned table in the list.
1180 : */
1181 36 : foreach(lc, root_relids)
1182 18 : relids = GetPubPartitionOptionRelations(relids,
1183 : PUBLICATION_PART_ALL,
1184 : lfirst_oid(lc));
1185 : }
1186 :
1187 48 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1188 : PUBLICATION_PART_ALL);
1189 48 : relids = list_concat_unique_oid(relids, schemarelids);
1190 :
1191 48 : InvalidatePublicationRels(relids);
1192 : }
1193 :
1194 58 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1195 58 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1196 : (Node *) stmt);
1197 :
1198 58 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1199 58 : }
1200 :
1201 : /*
1202 : * Invalidate the relations.
1203 : */
1204 : void
1205 1225 : InvalidatePublicationRels(List *relids)
1206 : {
1207 : /*
1208 : * We don't want to send too many individual messages, at some point it's
1209 : * cheaper to just reset whole relcache.
1210 : */
1211 1225 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1212 : {
1213 : ListCell *lc;
1214 :
1215 10337 : foreach(lc, relids)
1216 9112 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1217 : }
1218 : else
1219 0 : CacheInvalidateRelcacheAll();
1220 1225 : }
1221 :
1222 : /*
1223 : * Add or remove table to/from publication.
1224 : */
1225 : static void
1226 459 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1227 : List *tables, const char *queryString,
1228 : bool publish_schema)
1229 : {
1230 459 : List *rels = NIL;
1231 459 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1232 459 : Oid pubid = pubform->oid;
1233 :
1234 : /*
1235 : * Nothing to do if no objects, except in SET: for that it is quite
1236 : * possible that user has not specified any tables in which case we need
1237 : * to remove all the existing tables.
1238 : */
1239 459 : if (!tables && stmt->action != AP_SetObjects)
1240 38 : return;
1241 :
1242 421 : rels = OpenTableList(tables);
1243 :
1244 421 : if (stmt->action == AP_AddObjects)
1245 : {
1246 148 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1247 :
1248 139 : publish_schema |= is_schema_publication(pubid);
1249 :
1250 139 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1251 139 : pubform->pubviaroot);
1252 :
1253 133 : PublicationAddTables(pubid, rels, false, stmt);
1254 : }
1255 273 : else if (stmt->action == AP_DropObjects)
1256 50 : PublicationDropTables(pubid, rels, false);
1257 : else /* AP_SetObjects */
1258 : {
1259 223 : List *oldrelids = GetPublicationRelations(pubid,
1260 : PUBLICATION_PART_ROOT);
1261 223 : List *delrels = NIL;
1262 : ListCell *oldlc;
1263 :
1264 223 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1265 :
1266 214 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1267 214 : pubform->pubviaroot);
1268 :
1269 : /*
1270 : * To recreate the relation list for the publication, look for
1271 : * existing relations that do not need to be dropped.
1272 : */
1273 403 : foreach(oldlc, oldrelids)
1274 : {
1275 201 : Oid oldrelid = lfirst_oid(oldlc);
1276 : ListCell *newlc;
1277 : PublicationRelInfo *oldrel;
1278 201 : bool found = false;
1279 : HeapTuple rftuple;
1280 201 : Node *oldrelwhereclause = NULL;
1281 201 : Bitmapset *oldcolumns = NULL;
1282 :
1283 : /* look up the cache for the old relmap */
1284 201 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1285 : ObjectIdGetDatum(oldrelid),
1286 : ObjectIdGetDatum(pubid));
1287 :
1288 : /*
1289 : * See if the existing relation currently has a WHERE clause or a
1290 : * column list. We need to compare those too.
1291 : */
1292 201 : if (HeapTupleIsValid(rftuple))
1293 : {
1294 201 : bool isnull = true;
1295 : Datum whereClauseDatum;
1296 : Datum columnListDatum;
1297 :
1298 : /* Load the WHERE clause for this table. */
1299 201 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1300 : Anum_pg_publication_rel_prqual,
1301 : &isnull);
1302 201 : if (!isnull)
1303 105 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1304 :
1305 : /* Transform the int2vector column list to a bitmap. */
1306 201 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1307 : Anum_pg_publication_rel_prattrs,
1308 : &isnull);
1309 :
1310 201 : if (!isnull)
1311 68 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1312 :
1313 201 : ReleaseSysCache(rftuple);
1314 : }
1315 :
1316 389 : foreach(newlc, rels)
1317 : {
1318 : PublicationRelInfo *newpubrel;
1319 : Oid newrelid;
1320 202 : Bitmapset *newcolumns = NULL;
1321 :
1322 202 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1323 202 : newrelid = RelationGetRelid(newpubrel->relation);
1324 :
1325 : /*
1326 : * Validate the column list. If the column list or WHERE
1327 : * clause changes, then the validation done here will be
1328 : * duplicated inside PublicationAddTables(). The validation
1329 : * is cheap enough that that seems harmless.
1330 : */
1331 202 : newcolumns = pub_collist_validate(newpubrel->relation,
1332 : newpubrel->columns);
1333 :
1334 : /*
1335 : * Check if any of the new set of relations matches with the
1336 : * existing relations in the publication. Additionally, if the
1337 : * relation has an associated WHERE clause, check the WHERE
1338 : * expressions also match. Same for the column list. Drop the
1339 : * rest.
1340 : */
1341 196 : if (newrelid == oldrelid)
1342 : {
1343 142 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1344 40 : bms_equal(oldcolumns, newcolumns))
1345 : {
1346 8 : found = true;
1347 8 : break;
1348 : }
1349 : }
1350 : }
1351 :
1352 : /*
1353 : * Add the non-matched relations to a list so that they can be
1354 : * dropped.
1355 : */
1356 195 : if (!found)
1357 : {
1358 187 : oldrel = palloc_object(PublicationRelInfo);
1359 187 : oldrel->whereClause = NULL;
1360 187 : oldrel->columns = NIL;
1361 187 : oldrel->relation = table_open(oldrelid,
1362 : ShareUpdateExclusiveLock);
1363 187 : delrels = lappend(delrels, oldrel);
1364 : }
1365 : }
1366 :
1367 : /* And drop them. */
1368 202 : PublicationDropTables(pubid, delrels, true);
1369 :
1370 : /*
1371 : * Don't bother calculating the difference for adding, we'll catch and
1372 : * skip existing ones when doing catalog update.
1373 : */
1374 202 : PublicationAddTables(pubid, rels, true, stmt);
1375 :
1376 202 : CloseTableList(delrels);
1377 : }
1378 :
1379 355 : CloseTableList(rels);
1380 : }
1381 :
1382 : /*
1383 : * Alter the publication schemas.
1384 : *
1385 : * Add or remove schemas to/from publication.
1386 : */
1387 : static void
1388 393 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1389 : HeapTuple tup, List *schemaidlist)
1390 : {
1391 393 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1392 :
1393 : /*
1394 : * Nothing to do if no objects, except in SET: for that it is quite
1395 : * possible that user has not specified any schemas in which case we need
1396 : * to remove all the existing schemas.
1397 : */
1398 393 : if (!schemaidlist && stmt->action != AP_SetObjects)
1399 153 : return;
1400 :
1401 : /*
1402 : * Schema lock is held until the publication is altered to prevent
1403 : * concurrent schema deletion.
1404 : */
1405 240 : LockSchemaList(schemaidlist);
1406 240 : if (stmt->action == AP_AddObjects)
1407 : {
1408 : ListCell *lc;
1409 : List *reloids;
1410 :
1411 19 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1412 :
1413 28 : foreach(lc, reloids)
1414 : {
1415 : HeapTuple coltuple;
1416 :
1417 12 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1418 : ObjectIdGetDatum(lfirst_oid(lc)),
1419 : ObjectIdGetDatum(pubform->oid));
1420 :
1421 12 : if (!HeapTupleIsValid(coltuple))
1422 0 : continue;
1423 :
1424 : /*
1425 : * Disallow adding schema if column list is already part of the
1426 : * publication. See CheckPubRelationColumnList.
1427 : */
1428 12 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1429 3 : ereport(ERROR,
1430 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1431 : errmsg("cannot add schema to publication \"%s\"",
1432 : stmt->pubname),
1433 : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1434 :
1435 9 : ReleaseSysCache(coltuple);
1436 : }
1437 :
1438 16 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1439 : }
1440 221 : else if (stmt->action == AP_DropObjects)
1441 19 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1442 : else /* AP_SetObjects */
1443 : {
1444 202 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1445 202 : List *delschemas = NIL;
1446 :
1447 : /* Identify which schemas should be dropped */
1448 202 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1449 :
1450 : /*
1451 : * Schema lock is held until the publication is altered to prevent
1452 : * concurrent schema deletion.
1453 : */
1454 202 : LockSchemaList(delschemas);
1455 :
1456 : /* And drop them */
1457 202 : PublicationDropSchemas(pubform->oid, delschemas, true);
1458 :
1459 : /*
1460 : * Don't bother calculating the difference for adding, we'll catch and
1461 : * skip existing ones when doing catalog update.
1462 : */
1463 202 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1464 : }
1465 : }
1466 :
1467 : /*
1468 : * Check if relations and schemas can be in a given publication and throw
1469 : * appropriate error if not.
1470 : */
1471 : static void
1472 480 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1473 : List *tables, List *schemaidlist)
1474 : {
1475 480 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1476 :
1477 480 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1478 52 : schemaidlist && !superuser())
1479 3 : ereport(ERROR,
1480 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1481 : errmsg("must be superuser to add or set schemas")));
1482 :
1483 : /*
1484 : * Check that user is allowed to manipulate the publication tables in
1485 : * schema
1486 : */
1487 477 : if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
1488 : {
1489 9 : if (pubform->puballtables && pubform->puballsequences)
1490 0 : ereport(ERROR,
1491 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1492 : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1493 : NameStr(pubform->pubname)),
1494 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1495 9 : else if (pubform->puballtables)
1496 9 : ereport(ERROR,
1497 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1498 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1499 : NameStr(pubform->pubname)),
1500 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
1501 : else
1502 0 : ereport(ERROR,
1503 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1504 : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1505 : NameStr(pubform->pubname)),
1506 : errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1507 : }
1508 :
1509 : /* Check that user is allowed to manipulate the publication tables. */
1510 468 : if (tables && (pubform->puballtables || pubform->puballsequences))
1511 : {
1512 9 : if (pubform->puballtables && pubform->puballsequences)
1513 0 : ereport(ERROR,
1514 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1515 : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1516 : NameStr(pubform->pubname)),
1517 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1518 9 : else if (pubform->puballtables)
1519 9 : ereport(ERROR,
1520 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1521 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1522 : NameStr(pubform->pubname)),
1523 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
1524 : else
1525 0 : ereport(ERROR,
1526 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1527 : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1528 : NameStr(pubform->pubname)),
1529 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1530 : }
1531 459 : }
1532 :
1533 : /*
1534 : * Alter the existing publication.
1535 : *
1536 : * This is dispatcher function for AlterPublicationOptions,
1537 : * AlterPublicationSchemas and AlterPublicationTables.
1538 : */
1539 : void
1540 553 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1541 : {
1542 : Relation rel;
1543 : HeapTuple tup;
1544 : Form_pg_publication pubform;
1545 :
1546 553 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1547 :
1548 553 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1549 : CStringGetDatum(stmt->pubname));
1550 :
1551 553 : if (!HeapTupleIsValid(tup))
1552 0 : ereport(ERROR,
1553 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1554 : errmsg("publication \"%s\" does not exist",
1555 : stmt->pubname)));
1556 :
1557 553 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1558 :
1559 : /* must be owner */
1560 553 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1561 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1562 0 : stmt->pubname);
1563 :
1564 553 : if (stmt->options)
1565 64 : AlterPublicationOptions(pstate, stmt, rel, tup);
1566 : else
1567 : {
1568 489 : List *relations = NIL;
1569 489 : List *schemaidlist = NIL;
1570 489 : Oid pubid = pubform->oid;
1571 :
1572 489 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1573 : &schemaidlist);
1574 :
1575 480 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1576 :
1577 459 : heap_freetuple(tup);
1578 :
1579 : /* Lock the publication so nobody else can do anything with it. */
1580 459 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1581 : AccessExclusiveLock);
1582 :
1583 : /*
1584 : * It is possible that by the time we acquire the lock on publication,
1585 : * concurrent DDL has removed it. We can test this by checking the
1586 : * existence of publication. We get the tuple again to avoid the risk
1587 : * of any publication option getting changed.
1588 : */
1589 459 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1590 459 : if (!HeapTupleIsValid(tup))
1591 0 : ereport(ERROR,
1592 : errcode(ERRCODE_UNDEFINED_OBJECT),
1593 : errmsg("publication \"%s\" does not exist",
1594 : stmt->pubname));
1595 :
1596 459 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1597 : schemaidlist != NIL);
1598 393 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1599 : }
1600 :
1601 : /* Cleanup. */
1602 442 : heap_freetuple(tup);
1603 442 : table_close(rel, RowExclusiveLock);
1604 442 : }
1605 :
1606 : /*
1607 : * Remove relation from publication by mapping OID.
1608 : */
1609 : void
1610 424 : RemovePublicationRelById(Oid proid)
1611 : {
1612 : Relation rel;
1613 : HeapTuple tup;
1614 : Form_pg_publication_rel pubrel;
1615 424 : List *relids = NIL;
1616 :
1617 424 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1618 :
1619 424 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1620 :
1621 424 : if (!HeapTupleIsValid(tup))
1622 0 : elog(ERROR, "cache lookup failed for publication table %u",
1623 : proid);
1624 :
1625 424 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1626 :
1627 : /*
1628 : * Invalidate relcache so that publication info is rebuilt.
1629 : *
1630 : * For the partitioned tables, we must invalidate all partitions contained
1631 : * in the respective partition hierarchies, not just the one explicitly
1632 : * mentioned in the publication. This is required because we implicitly
1633 : * publish the child tables when the parent table is published.
1634 : */
1635 424 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1636 : pubrel->prrelid);
1637 :
1638 424 : InvalidatePublicationRels(relids);
1639 :
1640 424 : CatalogTupleDelete(rel, &tup->t_self);
1641 :
1642 424 : ReleaseSysCache(tup);
1643 :
1644 424 : table_close(rel, RowExclusiveLock);
1645 424 : }
1646 :
1647 : /*
1648 : * Remove the publication by mapping OID.
1649 : */
1650 : void
1651 250 : RemovePublicationById(Oid pubid)
1652 : {
1653 : Relation rel;
1654 : HeapTuple tup;
1655 : Form_pg_publication pubform;
1656 :
1657 250 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1658 :
1659 250 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1660 250 : if (!HeapTupleIsValid(tup))
1661 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1662 :
1663 250 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1664 :
1665 : /* Invalidate relcache so that publication info is rebuilt. */
1666 250 : if (pubform->puballtables)
1667 35 : CacheInvalidateRelcacheAll();
1668 :
1669 250 : CatalogTupleDelete(rel, &tup->t_self);
1670 :
1671 250 : ReleaseSysCache(tup);
1672 :
1673 250 : table_close(rel, RowExclusiveLock);
1674 250 : }
1675 :
1676 : /*
1677 : * Remove schema from publication by mapping OID.
1678 : */
1679 : void
1680 96 : RemovePublicationSchemaById(Oid psoid)
1681 : {
1682 : Relation rel;
1683 : HeapTuple tup;
1684 96 : List *schemaRels = NIL;
1685 : Form_pg_publication_namespace pubsch;
1686 :
1687 96 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1688 :
1689 96 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1690 :
1691 96 : if (!HeapTupleIsValid(tup))
1692 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1693 :
1694 96 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1695 :
1696 : /*
1697 : * Invalidate relcache so that publication info is rebuilt. See
1698 : * RemovePublicationRelById for why we need to consider all the
1699 : * partitions.
1700 : */
1701 96 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1702 : PUBLICATION_PART_ALL);
1703 96 : InvalidatePublicationRels(schemaRels);
1704 :
1705 96 : CatalogTupleDelete(rel, &tup->t_self);
1706 :
1707 96 : ReleaseSysCache(tup);
1708 :
1709 96 : table_close(rel, RowExclusiveLock);
1710 96 : }
1711 :
1712 : /*
1713 : * Open relations specified by a PublicationTable list.
1714 : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1715 : * add them to a publication.
1716 : */
1717 : static List *
1718 658 : OpenTableList(List *tables)
1719 : {
1720 658 : List *relids = NIL;
1721 658 : List *rels = NIL;
1722 : ListCell *lc;
1723 658 : List *relids_with_rf = NIL;
1724 658 : List *relids_with_collist = NIL;
1725 :
1726 : /*
1727 : * Open, share-lock, and check all the explicitly-specified relations
1728 : */
1729 1350 : foreach(lc, tables)
1730 : {
1731 707 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1732 707 : bool recurse = t->relation->inh;
1733 : Relation rel;
1734 : Oid myrelid;
1735 : PublicationRelInfo *pub_rel;
1736 :
1737 : /* Allow query cancel in case this takes a long time */
1738 707 : CHECK_FOR_INTERRUPTS();
1739 :
1740 707 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1741 704 : myrelid = RelationGetRelid(rel);
1742 :
1743 : /*
1744 : * Filter out duplicates if user specifies "foo, foo".
1745 : *
1746 : * Note that this algorithm is known to not be very efficient (O(N^2))
1747 : * but given that it only works on list of tables given to us by user
1748 : * it's deemed acceptable.
1749 : */
1750 704 : if (list_member_oid(relids, myrelid))
1751 : {
1752 : /* Disallow duplicate tables if there are any with row filters. */
1753 12 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1754 6 : ereport(ERROR,
1755 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1756 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1757 : RelationGetRelationName(rel))));
1758 :
1759 : /* Disallow duplicate tables if there are any with column lists. */
1760 6 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1761 6 : ereport(ERROR,
1762 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1763 : errmsg("conflicting or redundant column lists for table \"%s\"",
1764 : RelationGetRelationName(rel))));
1765 :
1766 0 : table_close(rel, ShareUpdateExclusiveLock);
1767 0 : continue;
1768 : }
1769 :
1770 692 : pub_rel = palloc_object(PublicationRelInfo);
1771 692 : pub_rel->relation = rel;
1772 692 : pub_rel->whereClause = t->whereClause;
1773 692 : pub_rel->columns = t->columns;
1774 692 : rels = lappend(rels, pub_rel);
1775 692 : relids = lappend_oid(relids, myrelid);
1776 :
1777 692 : if (t->whereClause)
1778 202 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1779 :
1780 692 : if (t->columns)
1781 205 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1782 :
1783 : /*
1784 : * Add children of this rel, if requested, so that they too are added
1785 : * to the publication. A partitioned table can't have any inheritance
1786 : * children other than its partitions, which need not be explicitly
1787 : * added to the publication.
1788 : */
1789 692 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1790 : {
1791 : List *children;
1792 : ListCell *child;
1793 :
1794 580 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1795 : NULL);
1796 :
1797 1164 : foreach(child, children)
1798 : {
1799 584 : Oid childrelid = lfirst_oid(child);
1800 :
1801 : /* Allow query cancel in case this takes a long time */
1802 584 : CHECK_FOR_INTERRUPTS();
1803 :
1804 : /*
1805 : * Skip duplicates if user specified both parent and child
1806 : * tables.
1807 : */
1808 584 : if (list_member_oid(relids, childrelid))
1809 : {
1810 : /*
1811 : * We don't allow to specify row filter for both parent
1812 : * and child table at the same time as it is not very
1813 : * clear which one should be given preference.
1814 : */
1815 580 : if (childrelid != myrelid &&
1816 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1817 0 : ereport(ERROR,
1818 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1819 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1820 : RelationGetRelationName(rel))));
1821 :
1822 : /*
1823 : * We don't allow to specify column list for both parent
1824 : * and child table at the same time as it is not very
1825 : * clear which one should be given preference.
1826 : */
1827 580 : if (childrelid != myrelid &&
1828 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1829 0 : ereport(ERROR,
1830 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1831 : errmsg("conflicting or redundant column lists for table \"%s\"",
1832 : RelationGetRelationName(rel))));
1833 :
1834 580 : continue;
1835 : }
1836 :
1837 : /* find_all_inheritors already got lock */
1838 4 : rel = table_open(childrelid, NoLock);
1839 4 : pub_rel = palloc_object(PublicationRelInfo);
1840 4 : pub_rel->relation = rel;
1841 : /* child inherits WHERE clause from parent */
1842 4 : pub_rel->whereClause = t->whereClause;
1843 :
1844 : /* child inherits column list from parent */
1845 4 : pub_rel->columns = t->columns;
1846 4 : rels = lappend(rels, pub_rel);
1847 4 : relids = lappend_oid(relids, childrelid);
1848 :
1849 4 : if (t->whereClause)
1850 1 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1851 :
1852 4 : if (t->columns)
1853 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1854 : }
1855 : }
1856 : }
1857 :
1858 643 : list_free(relids);
1859 643 : list_free(relids_with_rf);
1860 :
1861 643 : return rels;
1862 : }
1863 :
1864 : /*
1865 : * Close all relations in the list.
1866 : */
1867 : static void
1868 751 : CloseTableList(List *rels)
1869 : {
1870 : ListCell *lc;
1871 :
1872 1528 : foreach(lc, rels)
1873 : {
1874 : PublicationRelInfo *pub_rel;
1875 :
1876 777 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1877 777 : table_close(pub_rel->relation, NoLock);
1878 : }
1879 :
1880 751 : list_free_deep(rels);
1881 751 : }
1882 :
1883 : /*
1884 : * Lock the schemas specified in the schema list in AccessShareLock mode in
1885 : * order to prevent concurrent schema deletion.
1886 : */
1887 : static void
1888 518 : LockSchemaList(List *schemalist)
1889 : {
1890 : ListCell *lc;
1891 :
1892 680 : foreach(lc, schemalist)
1893 : {
1894 162 : Oid schemaid = lfirst_oid(lc);
1895 :
1896 : /* Allow query cancel in case this takes a long time */
1897 162 : CHECK_FOR_INTERRUPTS();
1898 162 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1899 :
1900 : /*
1901 : * It is possible that by the time we acquire the lock on schema,
1902 : * concurrent DDL has removed it. We can test this by checking the
1903 : * existence of schema.
1904 : */
1905 162 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1906 0 : ereport(ERROR,
1907 : errcode(ERRCODE_UNDEFINED_SCHEMA),
1908 : errmsg("schema with OID %u does not exist", schemaid));
1909 : }
1910 518 : }
1911 :
1912 : /*
1913 : * Add listed tables to the publication.
1914 : */
1915 : static void
1916 542 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1917 : AlterPublicationStmt *stmt)
1918 : {
1919 : ListCell *lc;
1920 :
1921 1088 : foreach(lc, rels)
1922 : {
1923 580 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1924 580 : Relation rel = pub_rel->relation;
1925 : ObjectAddress obj;
1926 :
1927 : /* Must be owner of the table or superuser. */
1928 580 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1929 3 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1930 3 : RelationGetRelationName(rel));
1931 :
1932 577 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1933 546 : if (stmt)
1934 : {
1935 314 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1936 : (Node *) stmt);
1937 :
1938 314 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1939 : obj.objectId, 0);
1940 : }
1941 : }
1942 508 : }
1943 :
1944 : /*
1945 : * Remove listed tables from the publication.
1946 : */
1947 : static void
1948 252 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1949 : {
1950 : ObjectAddress obj;
1951 : ListCell *lc;
1952 : Oid prid;
1953 :
1954 483 : foreach(lc, rels)
1955 : {
1956 240 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1957 240 : Relation rel = pubrel->relation;
1958 240 : Oid relid = RelationGetRelid(rel);
1959 :
1960 240 : if (pubrel->columns)
1961 0 : ereport(ERROR,
1962 : errcode(ERRCODE_SYNTAX_ERROR),
1963 : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1964 :
1965 240 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1966 : ObjectIdGetDatum(relid),
1967 : ObjectIdGetDatum(pubid));
1968 240 : if (!OidIsValid(prid))
1969 : {
1970 6 : if (missing_ok)
1971 0 : continue;
1972 :
1973 6 : ereport(ERROR,
1974 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1975 : errmsg("relation \"%s\" is not part of the publication",
1976 : RelationGetRelationName(rel))));
1977 : }
1978 :
1979 234 : if (pubrel->whereClause)
1980 3 : ereport(ERROR,
1981 : (errcode(ERRCODE_SYNTAX_ERROR),
1982 : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1983 :
1984 231 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1985 231 : performDeletion(&obj, DROP_CASCADE, 0);
1986 : }
1987 243 : }
1988 :
1989 : /*
1990 : * Add listed schemas to the publication.
1991 : */
1992 : static void
1993 294 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1994 : AlterPublicationStmt *stmt)
1995 : {
1996 : ListCell *lc;
1997 :
1998 419 : foreach(lc, schemas)
1999 : {
2000 131 : Oid schemaid = lfirst_oid(lc);
2001 : ObjectAddress obj;
2002 :
2003 131 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
2004 125 : if (stmt)
2005 : {
2006 34 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
2007 : (Node *) stmt);
2008 :
2009 34 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
2010 : obj.objectId, 0);
2011 : }
2012 : }
2013 288 : }
2014 :
2015 : /*
2016 : * Remove listed schemas from the publication.
2017 : */
2018 : static void
2019 221 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
2020 : {
2021 : ObjectAddress obj;
2022 : ListCell *lc;
2023 : Oid psid;
2024 :
2025 246 : foreach(lc, schemas)
2026 : {
2027 28 : Oid schemaid = lfirst_oid(lc);
2028 :
2029 28 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
2030 : Anum_pg_publication_namespace_oid,
2031 : ObjectIdGetDatum(schemaid),
2032 : ObjectIdGetDatum(pubid));
2033 28 : if (!OidIsValid(psid))
2034 : {
2035 3 : if (missing_ok)
2036 0 : continue;
2037 :
2038 3 : ereport(ERROR,
2039 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2040 : errmsg("tables from schema \"%s\" are not part of the publication",
2041 : get_namespace_name(schemaid))));
2042 : }
2043 :
2044 25 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
2045 25 : performDeletion(&obj, DROP_CASCADE, 0);
2046 : }
2047 218 : }
2048 :
2049 : /*
2050 : * Internal workhorse for changing a publication owner
2051 : */
2052 : static void
2053 18 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2054 : {
2055 : Form_pg_publication form;
2056 :
2057 18 : form = (Form_pg_publication) GETSTRUCT(tup);
2058 :
2059 18 : if (form->pubowner == newOwnerId)
2060 6 : return;
2061 :
2062 12 : if (!superuser())
2063 : {
2064 : AclResult aclresult;
2065 :
2066 : /* Must be owner */
2067 6 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2068 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2069 0 : NameStr(form->pubname));
2070 :
2071 : /* Must be able to become new owner */
2072 6 : check_can_set_role(GetUserId(), newOwnerId);
2073 :
2074 : /* New owner must have CREATE privilege on database */
2075 6 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2076 6 : if (aclresult != ACLCHECK_OK)
2077 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2078 0 : get_database_name(MyDatabaseId));
2079 :
2080 6 : if (!superuser_arg(newOwnerId))
2081 : {
2082 6 : if (form->puballtables || form->puballsequences ||
2083 3 : is_schema_publication(form->oid))
2084 3 : ereport(ERROR,
2085 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2086 : errmsg("permission denied to change owner of publication \"%s\"",
2087 : NameStr(form->pubname)),
2088 : errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
2089 : }
2090 : }
2091 :
2092 9 : form->pubowner = newOwnerId;
2093 9 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2094 :
2095 : /* Update owner dependency reference */
2096 9 : changeDependencyOnOwner(PublicationRelationId,
2097 : form->oid,
2098 : newOwnerId);
2099 :
2100 9 : InvokeObjectPostAlterHook(PublicationRelationId,
2101 : form->oid, 0);
2102 : }
2103 :
2104 : /*
2105 : * Change publication owner -- by name
2106 : */
2107 : ObjectAddress
2108 18 : AlterPublicationOwner(const char *name, Oid newOwnerId)
2109 : {
2110 : Oid pubid;
2111 : HeapTuple tup;
2112 : Relation rel;
2113 : ObjectAddress address;
2114 : Form_pg_publication pubform;
2115 :
2116 18 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2117 :
2118 18 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2119 :
2120 18 : if (!HeapTupleIsValid(tup))
2121 0 : ereport(ERROR,
2122 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2123 : errmsg("publication \"%s\" does not exist", name)));
2124 :
2125 18 : pubform = (Form_pg_publication) GETSTRUCT(tup);
2126 18 : pubid = pubform->oid;
2127 :
2128 18 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2129 :
2130 15 : ObjectAddressSet(address, PublicationRelationId, pubid);
2131 :
2132 15 : heap_freetuple(tup);
2133 :
2134 15 : table_close(rel, RowExclusiveLock);
2135 :
2136 15 : return address;
2137 : }
2138 :
2139 : /*
2140 : * Change publication owner -- by OID
2141 : */
2142 : void
2143 0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
2144 : {
2145 : HeapTuple tup;
2146 : Relation rel;
2147 :
2148 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2149 :
2150 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2151 :
2152 0 : if (!HeapTupleIsValid(tup))
2153 0 : ereport(ERROR,
2154 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2155 : errmsg("publication with OID %u does not exist", pubid)));
2156 :
2157 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2158 :
2159 0 : heap_freetuple(tup);
2160 :
2161 0 : table_close(rel, RowExclusiveLock);
2162 0 : }
2163 :
2164 : /*
2165 : * Extract the publish_generated_columns option value from a DefElem. "stored"
2166 : * and "none" values are accepted.
2167 : */
2168 : static char
2169 38 : defGetGeneratedColsOption(DefElem *def)
2170 : {
2171 38 : char *sval = "";
2172 :
2173 : /*
2174 : * A parameter value is required.
2175 : */
2176 38 : if (def->arg)
2177 : {
2178 35 : sval = defGetString(def);
2179 :
2180 35 : if (pg_strcasecmp(sval, "none") == 0)
2181 11 : return PUBLISH_GENCOLS_NONE;
2182 24 : if (pg_strcasecmp(sval, "stored") == 0)
2183 21 : return PUBLISH_GENCOLS_STORED;
2184 : }
2185 :
2186 6 : ereport(ERROR,
2187 : errcode(ERRCODE_SYNTAX_ERROR),
2188 : errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
2189 : errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
2190 :
2191 : return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2192 : }
|