Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/namespace.h"
23 : #include "catalog/objectaccess.h"
24 : #include "catalog/objectaddress.h"
25 : #include "catalog/pg_database.h"
26 : #include "catalog/pg_inherits.h"
27 : #include "catalog/pg_namespace.h"
28 : #include "catalog/pg_proc.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "commands/dbcommands.h"
33 : #include "commands/defrem.h"
34 : #include "commands/event_trigger.h"
35 : #include "commands/publicationcmds.h"
36 : #include "miscadmin.h"
37 : #include "nodes/nodeFuncs.h"
38 : #include "parser/parse_clause.h"
39 : #include "parser/parse_collate.h"
40 : #include "parser/parse_relation.h"
41 : #include "storage/lmgr.h"
42 : #include "utils/acl.h"
43 : #include "utils/builtins.h"
44 : #include "utils/inval.h"
45 : #include "utils/lsyscache.h"
46 : #include "utils/rel.h"
47 : #include "utils/syscache.h"
48 : #include "utils/varlena.h"
49 :
50 :
51 : /*
52 : * Information used to validate the columns in the row filter expression. See
53 : * contain_invalid_rfcolumn_walker for details.
54 : */
55 : typedef struct rf_context
56 : {
57 : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : bool pubviaroot; /* true if we are validating the parent
59 : * relation's row filter */
60 : Oid relid; /* relid of the relation */
61 : Oid parentid; /* relid of the parent relation */
62 : } rf_context;
63 :
64 : static List *OpenTableList(List *tables);
65 : static void CloseTableList(List *rels);
66 : static void LockSchemaList(List *schemalist);
67 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : AlterPublicationStmt *stmt);
69 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : AlterPublicationStmt *stmt);
72 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 :
74 :
75 : static void
76 856 : parse_publication_options(ParseState *pstate,
77 : List *options,
78 : bool *publish_given,
79 : PublicationActions *pubactions,
80 : bool *publish_via_partition_root_given,
81 : bool *publish_via_partition_root,
82 : bool *publish_generated_columns_given,
83 : bool *publish_generated_columns)
84 : {
85 : ListCell *lc;
86 :
87 856 : *publish_given = false;
88 856 : *publish_via_partition_root_given = false;
89 856 : *publish_generated_columns_given = false;
90 :
91 : /* defaults */
92 856 : pubactions->pubinsert = true;
93 856 : pubactions->pubupdate = true;
94 856 : pubactions->pubdelete = true;
95 856 : pubactions->pubtruncate = true;
96 856 : *publish_via_partition_root = false;
97 856 : *publish_generated_columns = false;
98 :
99 : /* Parse options */
100 1180 : foreach(lc, options)
101 : {
102 354 : DefElem *defel = (DefElem *) lfirst(lc);
103 :
104 354 : if (strcmp(defel->defname, "publish") == 0)
105 : {
106 : char *publish;
107 : List *publish_list;
108 : ListCell *lc2;
109 :
110 122 : if (*publish_given)
111 0 : errorConflictingDefElem(defel, pstate);
112 :
113 : /*
114 : * If publish option was given only the explicitly listed actions
115 : * should be published.
116 : */
117 122 : pubactions->pubinsert = false;
118 122 : pubactions->pubupdate = false;
119 122 : pubactions->pubdelete = false;
120 122 : pubactions->pubtruncate = false;
121 :
122 122 : *publish_given = true;
123 122 : publish = defGetString(defel);
124 :
125 122 : if (!SplitIdentifierString(publish, ',', &publish_list))
126 0 : ereport(ERROR,
127 : (errcode(ERRCODE_SYNTAX_ERROR),
128 : errmsg("invalid list syntax in parameter \"%s\"",
129 : "publish")));
130 :
131 : /* Process the option list. */
132 270 : foreach(lc2, publish_list)
133 : {
134 154 : char *publish_opt = (char *) lfirst(lc2);
135 :
136 154 : if (strcmp(publish_opt, "insert") == 0)
137 108 : pubactions->pubinsert = true;
138 46 : else if (strcmp(publish_opt, "update") == 0)
139 20 : pubactions->pubupdate = true;
140 26 : else if (strcmp(publish_opt, "delete") == 0)
141 10 : pubactions->pubdelete = true;
142 16 : else if (strcmp(publish_opt, "truncate") == 0)
143 10 : pubactions->pubtruncate = true;
144 : else
145 6 : ereport(ERROR,
146 : (errcode(ERRCODE_SYNTAX_ERROR),
147 : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
148 : "publish", publish_opt)));
149 : }
150 : }
151 232 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
152 : {
153 162 : if (*publish_via_partition_root_given)
154 6 : errorConflictingDefElem(defel, pstate);
155 156 : *publish_via_partition_root_given = true;
156 156 : *publish_via_partition_root = defGetBoolean(defel);
157 : }
158 70 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
159 : {
160 64 : if (*publish_generated_columns_given)
161 6 : errorConflictingDefElem(defel, pstate);
162 58 : *publish_generated_columns_given = true;
163 58 : *publish_generated_columns = defGetBoolean(defel);
164 : }
165 : else
166 6 : ereport(ERROR,
167 : (errcode(ERRCODE_SYNTAX_ERROR),
168 : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
169 : }
170 826 : }
171 :
172 : /*
173 : * Convert the PublicationObjSpecType list into schema oid list and
174 : * PublicationTable list.
175 : */
176 : static void
177 1572 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
178 : List **rels, List **schemas)
179 : {
180 : ListCell *cell;
181 : PublicationObjSpec *pubobj;
182 :
183 1572 : if (!pubobjspec_list)
184 86 : return;
185 :
186 3134 : foreach(cell, pubobjspec_list)
187 : {
188 : Oid schemaid;
189 : List *search_path;
190 :
191 1684 : pubobj = (PublicationObjSpec *) lfirst(cell);
192 :
193 1684 : switch (pubobj->pubobjtype)
194 : {
195 1328 : case PUBLICATIONOBJ_TABLE:
196 1328 : *rels = lappend(*rels, pubobj->pubtable);
197 1328 : break;
198 332 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
199 332 : schemaid = get_namespace_oid(pubobj->name, false);
200 :
201 : /* Filter out duplicates if user specifies "sch1, sch1" */
202 302 : *schemas = list_append_unique_oid(*schemas, schemaid);
203 302 : break;
204 24 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
205 24 : search_path = fetch_search_path(false);
206 24 : if (search_path == NIL) /* nothing valid in search_path? */
207 6 : ereport(ERROR,
208 : errcode(ERRCODE_UNDEFINED_SCHEMA),
209 : errmsg("no schema has been selected for CURRENT_SCHEMA"));
210 :
211 18 : schemaid = linitial_oid(search_path);
212 18 : list_free(search_path);
213 :
214 : /* Filter out duplicates if user specifies "sch1, sch1" */
215 18 : *schemas = list_append_unique_oid(*schemas, schemaid);
216 18 : break;
217 0 : default:
218 : /* shouldn't happen */
219 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
220 : break;
221 : }
222 : }
223 : }
224 :
225 : /*
226 : * Returns true if any of the columns used in the row filter WHERE expression is
227 : * not part of REPLICA IDENTITY, false otherwise.
228 : */
229 : static bool
230 258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
231 : {
232 258 : if (node == NULL)
233 0 : return false;
234 :
235 258 : if (IsA(node, Var))
236 : {
237 104 : Var *var = (Var *) node;
238 104 : AttrNumber attnum = var->varattno;
239 :
240 : /*
241 : * If pubviaroot is true, we are validating the row filter of the
242 : * parent table, but the bitmap contains the replica identity
243 : * information of the child table. So, get the column number of the
244 : * child table as parent and child column order could be different.
245 : */
246 104 : if (context->pubviaroot)
247 : {
248 16 : char *colname = get_attname(context->parentid, attnum, false);
249 :
250 16 : attnum = get_attnum(context->relid, colname);
251 : }
252 :
253 104 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
254 104 : context->bms_replident))
255 60 : return true;
256 : }
257 :
258 198 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
259 : context);
260 : }
261 :
262 : /*
263 : * Check if all columns referenced in the filter expression are part of the
264 : * REPLICA IDENTITY index or not.
265 : *
266 : * Returns true if any invalid column is found.
267 : */
268 : bool
269 698 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
270 : bool pubviaroot)
271 : {
272 : HeapTuple rftuple;
273 698 : Oid relid = RelationGetRelid(relation);
274 698 : Oid publish_as_relid = RelationGetRelid(relation);
275 698 : bool result = false;
276 : Datum rfdatum;
277 : bool rfisnull;
278 :
279 : /*
280 : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
281 : * allowed in the row filter and we can skip the validation.
282 : */
283 698 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
284 174 : return false;
285 :
286 : /*
287 : * For a partition, if pubviaroot is true, find the topmost ancestor that
288 : * is published via this publication as we need to use its row filter
289 : * expression to filter the partition's changes.
290 : *
291 : * Note that even though the row filter used is for an ancestor, the
292 : * REPLICA IDENTITY used will be for the actual child table.
293 : */
294 524 : if (pubviaroot && relation->rd_rel->relispartition)
295 : {
296 : publish_as_relid
297 108 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
298 :
299 108 : if (!OidIsValid(publish_as_relid))
300 6 : publish_as_relid = relid;
301 : }
302 :
303 524 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
304 : ObjectIdGetDatum(publish_as_relid),
305 : ObjectIdGetDatum(pubid));
306 :
307 524 : if (!HeapTupleIsValid(rftuple))
308 56 : return false;
309 :
310 468 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
311 : Anum_pg_publication_rel_prqual,
312 : &rfisnull);
313 :
314 468 : if (!rfisnull)
315 : {
316 102 : rf_context context = {0};
317 : Node *rfnode;
318 102 : Bitmapset *bms = NULL;
319 :
320 102 : context.pubviaroot = pubviaroot;
321 102 : context.parentid = publish_as_relid;
322 102 : context.relid = relid;
323 :
324 : /* Remember columns that are part of the REPLICA IDENTITY */
325 102 : bms = RelationGetIndexAttrBitmap(relation,
326 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
327 :
328 102 : context.bms_replident = bms;
329 102 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
330 102 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
331 : }
332 :
333 468 : ReleaseSysCache(rftuple);
334 :
335 468 : return result;
336 : }
337 :
338 : /*
339 : * Check for invalid columns in the publication table definition.
340 : *
341 : * This function evaluates two conditions:
342 : *
343 : * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
344 : * by the column list. If any column is missing, *invalid_column_list is set
345 : * to true.
346 : * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
347 : * are published either by listing them in the column list or by enabling
348 : * publish_generated_columns option. If any unpublished generated column is
349 : * found, *invalid_gen_col is set to true.
350 : *
351 : * Returns true if any of the above conditions are not met.
352 : */
353 : bool
354 890 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
355 : bool pubviaroot, bool pubgencols,
356 : bool *invalid_column_list,
357 : bool *invalid_gen_col)
358 : {
359 890 : Oid relid = RelationGetRelid(relation);
360 890 : Oid publish_as_relid = RelationGetRelid(relation);
361 : Bitmapset *idattrs;
362 890 : Bitmapset *columns = NULL;
363 890 : TupleDesc desc = RelationGetDescr(relation);
364 : Publication *pub;
365 : int x;
366 :
367 890 : *invalid_column_list = false;
368 890 : *invalid_gen_col = false;
369 :
370 : /*
371 : * For a partition, if pubviaroot is true, find the topmost ancestor that
372 : * is published via this publication as we need to use its column list for
373 : * the changes.
374 : *
375 : * Note that even though the column list used is for an ancestor, the
376 : * REPLICA IDENTITY used will be for the actual child table.
377 : */
378 890 : if (pubviaroot && relation->rd_rel->relispartition)
379 : {
380 160 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
381 :
382 160 : if (!OidIsValid(publish_as_relid))
383 36 : publish_as_relid = relid;
384 : }
385 :
386 : /* Fetch the column list */
387 890 : pub = GetPublication(pubid);
388 890 : check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
389 :
390 890 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
391 : {
392 : /* With REPLICA IDENTITY FULL, no column list is allowed. */
393 188 : *invalid_column_list = (columns != NULL);
394 :
395 : /*
396 : * As we don't allow a column list with REPLICA IDENTITY FULL, the
397 : * publish_generated_columns option must be set to true if the table
398 : * has any stored generated columns.
399 : */
400 188 : if (!pubgencols &&
401 182 : relation->rd_att->constr &&
402 70 : relation->rd_att->constr->has_generated_stored)
403 6 : *invalid_gen_col = true;
404 :
405 188 : if (*invalid_gen_col && *invalid_column_list)
406 0 : return true;
407 : }
408 :
409 : /* Remember columns that are part of the REPLICA IDENTITY */
410 890 : idattrs = RelationGetIndexAttrBitmap(relation,
411 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
412 :
413 : /*
414 : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
415 : * (to handle system columns the usual way), while column list does not
416 : * use offset, so we can't do bms_is_subset(). Instead, we have to loop
417 : * over the idattrs and check all of them are in the list.
418 : */
419 890 : x = -1;
420 1536 : while ((x = bms_next_member(idattrs, x)) >= 0)
421 : {
422 652 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
423 652 : Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
424 :
425 652 : if (columns == NULL)
426 : {
427 : /*
428 : * The publish_generated_columns option must be set to true if the
429 : * REPLICA IDENTITY contains any stored generated column.
430 : */
431 442 : if (!pubgencols && att->attgenerated)
432 : {
433 6 : *invalid_gen_col = true;
434 6 : break;
435 : }
436 :
437 : /* Skip validating the column list since it is not defined */
438 436 : continue;
439 : }
440 :
441 : /*
442 : * If pubviaroot is true, we are validating the column list of the
443 : * parent table, but the bitmap contains the replica identity
444 : * information of the child table. The parent/child attnums may not
445 : * match, so translate them to the parent - get the attname from the
446 : * child, and look it up in the parent.
447 : */
448 210 : if (pubviaroot)
449 : {
450 : /* attribute name in the child table */
451 80 : char *colname = get_attname(relid, attnum, false);
452 :
453 : /*
454 : * Determine the attnum for the attribute name in parent (we are
455 : * using the column list defined on the parent).
456 : */
457 80 : attnum = get_attnum(publish_as_relid, colname);
458 : }
459 :
460 : /* replica identity column, not covered by the column list */
461 210 : *invalid_column_list |= !bms_is_member(attnum, columns);
462 :
463 210 : if (*invalid_column_list && *invalid_gen_col)
464 0 : break;
465 : }
466 :
467 890 : bms_free(columns);
468 890 : bms_free(idattrs);
469 :
470 890 : return *invalid_column_list || *invalid_gen_col;
471 : }
472 :
473 : /* check_functions_in_node callback */
474 : static bool
475 396 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
476 : {
477 396 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
478 : func_id >= FirstNormalObjectId);
479 : }
480 :
481 : /*
482 : * The row filter walker checks if the row filter expression is a "simple
483 : * expression".
484 : *
485 : * It allows only simple or compound expressions such as:
486 : * - (Var Op Const)
487 : * - (Var Op Var)
488 : * - (Var Op Const) AND/OR (Var Op Const)
489 : * - etc
490 : * (where Var is a column of the table this filter belongs to)
491 : *
492 : * The simple expression has the following restrictions:
493 : * - User-defined operators are not allowed;
494 : * - User-defined functions are not allowed;
495 : * - User-defined types are not allowed;
496 : * - User-defined collations are not allowed;
497 : * - Non-immutable built-in functions are not allowed;
498 : * - System columns are not allowed.
499 : *
500 : * NOTES
501 : *
502 : * We don't allow user-defined functions/operators/types/collations because
503 : * (a) if a user drops a user-defined object used in a row filter expression or
504 : * if there is any other error while using it, the logical decoding
505 : * infrastructure won't be able to recover from such an error even if the
506 : * object is recreated again because a historic snapshot is used to evaluate
507 : * the row filter;
508 : * (b) a user-defined function can be used to access tables that could have
509 : * unpleasant results because a historic snapshot is used. That's why only
510 : * immutable built-in functions are allowed in row filter expressions.
511 : *
512 : * We don't allow system columns because currently, we don't have that
513 : * information in the tuple passed to downstream. Also, as we don't replicate
514 : * those to subscribers, there doesn't seem to be a need for a filter on those
515 : * columns.
516 : *
517 : * We can allow other node types after more analysis and testing.
518 : */
519 : static bool
520 1328 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
521 : {
522 1328 : char *errdetail_msg = NULL;
523 :
524 1328 : if (node == NULL)
525 6 : return false;
526 :
527 1322 : switch (nodeTag(node))
528 : {
529 356 : case T_Var:
530 : /* System columns are not allowed. */
531 356 : if (((Var *) node)->varattno < InvalidAttrNumber)
532 6 : errdetail_msg = _("System columns are not allowed.");
533 356 : break;
534 352 : case T_OpExpr:
535 : case T_DistinctExpr:
536 : case T_NullIfExpr:
537 : /* OK, except user-defined operators are not allowed. */
538 352 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
539 6 : errdetail_msg = _("User-defined operators are not allowed.");
540 352 : break;
541 6 : case T_ScalarArrayOpExpr:
542 : /* OK, except user-defined operators are not allowed. */
543 6 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
544 0 : errdetail_msg = _("User-defined operators are not allowed.");
545 :
546 : /*
547 : * We don't need to check the hashfuncid and negfuncid of
548 : * ScalarArrayOpExpr as those functions are only built for a
549 : * subquery.
550 : */
551 6 : break;
552 6 : case T_RowCompareExpr:
553 : {
554 : ListCell *opid;
555 :
556 : /* OK, except user-defined operators are not allowed. */
557 18 : foreach(opid, ((RowCompareExpr *) node)->opnos)
558 : {
559 12 : if (lfirst_oid(opid) >= FirstNormalObjectId)
560 : {
561 0 : errdetail_msg = _("User-defined operators are not allowed.");
562 0 : break;
563 : }
564 : }
565 : }
566 6 : break;
567 596 : case T_Const:
568 : case T_FuncExpr:
569 : case T_BoolExpr:
570 : case T_RelabelType:
571 : case T_CollateExpr:
572 : case T_CaseExpr:
573 : case T_CaseTestExpr:
574 : case T_ArrayExpr:
575 : case T_RowExpr:
576 : case T_CoalesceExpr:
577 : case T_MinMaxExpr:
578 : case T_XmlExpr:
579 : case T_NullTest:
580 : case T_BooleanTest:
581 : case T_List:
582 : /* OK, supported */
583 596 : break;
584 6 : default:
585 6 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
586 6 : break;
587 : }
588 :
589 : /*
590 : * For all the supported nodes, if we haven't already found a problem,
591 : * check the types, functions, and collations used in it. We check List
592 : * by walking through each element.
593 : */
594 1322 : if (!errdetail_msg && !IsA(node, List))
595 : {
596 1250 : if (exprType(node) >= FirstNormalObjectId)
597 6 : errdetail_msg = _("User-defined types are not allowed.");
598 1244 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
599 : pstate))
600 12 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
601 2464 : else if (exprCollation(node) >= FirstNormalObjectId ||
602 1232 : exprInputCollation(node) >= FirstNormalObjectId)
603 6 : errdetail_msg = _("User-defined collations are not allowed.");
604 : }
605 :
606 : /*
607 : * If we found a problem in this node, throw error now. Otherwise keep
608 : * going.
609 : */
610 1322 : if (errdetail_msg)
611 42 : ereport(ERROR,
612 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
613 : errmsg("invalid publication WHERE expression"),
614 : errdetail_internal("%s", errdetail_msg),
615 : parser_errposition(pstate, exprLocation(node))));
616 :
617 1280 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
618 : pstate);
619 : }
620 :
621 : /*
622 : * Check if the row filter expression is a "simple expression".
623 : *
624 : * See check_simple_rowfilter_expr_walker for details.
625 : */
626 : static bool
627 344 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
628 : {
629 344 : return check_simple_rowfilter_expr_walker(node, pstate);
630 : }
631 :
632 : /*
633 : * Transform the publication WHERE expression for all the relations in the list,
634 : * ensuring it is coerced to boolean and necessary collation information is
635 : * added if required, and add a new nsitem/RTE for the associated relation to
636 : * the ParseState's namespace list.
637 : *
638 : * Also check the publication row filter expression and throw an error if
639 : * anything not permitted or unexpected is encountered.
640 : */
641 : static void
642 1106 : TransformPubWhereClauses(List *tables, const char *queryString,
643 : bool pubviaroot)
644 : {
645 : ListCell *lc;
646 :
647 2212 : foreach(lc, tables)
648 : {
649 : ParseNamespaceItem *nsitem;
650 1166 : Node *whereclause = NULL;
651 : ParseState *pstate;
652 1166 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
653 :
654 1166 : if (pri->whereClause == NULL)
655 804 : continue;
656 :
657 : /*
658 : * If the publication doesn't publish changes via the root partitioned
659 : * table, the partition's row filter will be used. So disallow using
660 : * WHERE clause on partitioned table in this case.
661 : */
662 362 : if (!pubviaroot &&
663 340 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
664 6 : ereport(ERROR,
665 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
666 : errmsg("cannot use publication WHERE clause for relation \"%s\"",
667 : RelationGetRelationName(pri->relation)),
668 : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
669 : "publish_via_partition_root")));
670 :
671 : /*
672 : * A fresh pstate is required so that we only have "this" table in its
673 : * rangetable
674 : */
675 356 : pstate = make_parsestate(NULL);
676 356 : pstate->p_sourcetext = queryString;
677 356 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
678 : AccessShareLock, NULL,
679 : false, false);
680 356 : addNSItemToQuery(pstate, nsitem, false, true, true);
681 :
682 356 : whereclause = transformWhereClause(pstate,
683 356 : copyObject(pri->whereClause),
684 : EXPR_KIND_WHERE,
685 : "PUBLICATION WHERE");
686 :
687 : /* Fix up collation information */
688 344 : assign_expr_collations(pstate, whereclause);
689 :
690 : /*
691 : * We allow only simple expressions in row filters. See
692 : * check_simple_rowfilter_expr_walker.
693 : */
694 344 : check_simple_rowfilter_expr(whereclause, pstate);
695 :
696 302 : free_parsestate(pstate);
697 :
698 302 : pri->whereClause = whereclause;
699 : }
700 1046 : }
701 :
702 :
703 : /*
704 : * Given a list of tables that are going to be added to a publication,
705 : * verify that they fulfill the necessary preconditions, namely: no tables
706 : * have a column list if any schema is published; and partitioned tables do
707 : * not have column lists if publish_via_partition_root is not set.
708 : *
709 : * 'publish_schema' indicates that the publication contains any TABLES IN
710 : * SCHEMA elements (newly added in this command, or preexisting).
711 : * 'pubviaroot' is the value of publish_via_partition_root.
712 : */
713 : static void
714 1046 : CheckPubRelationColumnList(char *pubname, List *tables,
715 : bool publish_schema, bool pubviaroot)
716 : {
717 : ListCell *lc;
718 :
719 2122 : foreach(lc, tables)
720 : {
721 1106 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
722 :
723 1106 : if (pri->columns == NIL)
724 732 : continue;
725 :
726 : /*
727 : * Disallow specifying column list if any schema is in the
728 : * publication.
729 : *
730 : * XXX We could instead just forbid the case when the publication
731 : * tries to publish the table with a column list and a schema for that
732 : * table. However, if we do that then we need a restriction during
733 : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
734 : * seem to be a good idea.
735 : */
736 374 : if (publish_schema)
737 24 : ereport(ERROR,
738 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
739 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
740 : get_namespace_name(RelationGetNamespace(pri->relation)),
741 : RelationGetRelationName(pri->relation), pubname),
742 : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
743 :
744 : /*
745 : * If the publication doesn't publish changes via the root partitioned
746 : * table, the partition's column list will be used. So disallow using
747 : * a column list on the partitioned table in this case.
748 : */
749 350 : if (!pubviaroot &&
750 274 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
751 6 : ereport(ERROR,
752 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
753 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
754 : get_namespace_name(RelationGetNamespace(pri->relation)),
755 : RelationGetRelationName(pri->relation), pubname),
756 : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
757 : "publish_via_partition_root")));
758 : }
759 1016 : }
760 :
761 : /*
762 : * Create new publication.
763 : */
764 : ObjectAddress
765 752 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
766 : {
767 : Relation rel;
768 : ObjectAddress myself;
769 : Oid puboid;
770 : bool nulls[Natts_pg_publication];
771 : Datum values[Natts_pg_publication];
772 : HeapTuple tup;
773 : bool publish_given;
774 : PublicationActions pubactions;
775 : bool publish_via_partition_root_given;
776 : bool publish_via_partition_root;
777 : bool publish_generated_columns_given;
778 : bool publish_generated_columns;
779 : AclResult aclresult;
780 752 : List *relations = NIL;
781 752 : List *schemaidlist = NIL;
782 :
783 : /* must have CREATE privilege on database */
784 752 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
785 752 : if (aclresult != ACLCHECK_OK)
786 6 : aclcheck_error(aclresult, OBJECT_DATABASE,
787 6 : get_database_name(MyDatabaseId));
788 :
789 : /* FOR ALL TABLES requires superuser */
790 746 : if (stmt->for_all_tables && !superuser())
791 0 : ereport(ERROR,
792 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
793 : errmsg("must be superuser to create FOR ALL TABLES publication")));
794 :
795 746 : rel = table_open(PublicationRelationId, RowExclusiveLock);
796 :
797 : /* Check if name is used */
798 746 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
799 : CStringGetDatum(stmt->pubname));
800 746 : if (OidIsValid(puboid))
801 6 : ereport(ERROR,
802 : (errcode(ERRCODE_DUPLICATE_OBJECT),
803 : errmsg("publication \"%s\" already exists",
804 : stmt->pubname)));
805 :
806 : /* Form a tuple. */
807 740 : memset(values, 0, sizeof(values));
808 740 : memset(nulls, false, sizeof(nulls));
809 :
810 740 : values[Anum_pg_publication_pubname - 1] =
811 740 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
812 740 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
813 :
814 740 : parse_publication_options(pstate,
815 : stmt->options,
816 : &publish_given, &pubactions,
817 : &publish_via_partition_root_given,
818 : &publish_via_partition_root,
819 : &publish_generated_columns_given,
820 : &publish_generated_columns);
821 :
822 710 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
823 : Anum_pg_publication_oid);
824 710 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
825 710 : values[Anum_pg_publication_puballtables - 1] =
826 710 : BoolGetDatum(stmt->for_all_tables);
827 710 : values[Anum_pg_publication_pubinsert - 1] =
828 710 : BoolGetDatum(pubactions.pubinsert);
829 710 : values[Anum_pg_publication_pubupdate - 1] =
830 710 : BoolGetDatum(pubactions.pubupdate);
831 710 : values[Anum_pg_publication_pubdelete - 1] =
832 710 : BoolGetDatum(pubactions.pubdelete);
833 710 : values[Anum_pg_publication_pubtruncate - 1] =
834 710 : BoolGetDatum(pubactions.pubtruncate);
835 710 : values[Anum_pg_publication_pubviaroot - 1] =
836 710 : BoolGetDatum(publish_via_partition_root);
837 710 : values[Anum_pg_publication_pubgencols - 1] =
838 710 : BoolGetDatum(publish_generated_columns);
839 :
840 710 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
841 :
842 : /* Insert tuple into catalog. */
843 710 : CatalogTupleInsert(rel, tup);
844 710 : heap_freetuple(tup);
845 :
846 710 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
847 :
848 710 : ObjectAddressSet(myself, PublicationRelationId, puboid);
849 :
850 : /* Make the changes visible. */
851 710 : CommandCounterIncrement();
852 :
853 : /* Associate objects with the publication. */
854 710 : if (stmt->for_all_tables)
855 : {
856 : /* Invalidate relcache so that publication info is rebuilt. */
857 82 : CacheInvalidateRelcacheAll();
858 : }
859 : else
860 : {
861 628 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
862 : &schemaidlist);
863 :
864 : /* FOR TABLES IN SCHEMA requires superuser */
865 610 : if (schemaidlist != NIL && !superuser())
866 6 : ereport(ERROR,
867 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
868 : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
869 :
870 604 : if (relations != NIL)
871 : {
872 : List *rels;
873 :
874 410 : rels = OpenTableList(relations);
875 386 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
876 : publish_via_partition_root);
877 :
878 362 : CheckPubRelationColumnList(stmt->pubname, rels,
879 : schemaidlist != NIL,
880 : publish_via_partition_root);
881 :
882 356 : PublicationAddTables(puboid, rels, true, NULL);
883 330 : CloseTableList(rels);
884 : }
885 :
886 524 : if (schemaidlist != NIL)
887 : {
888 : /*
889 : * Schema lock is held until the publication is created to prevent
890 : * concurrent schema deletion.
891 : */
892 134 : LockSchemaList(schemaidlist);
893 134 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
894 : }
895 : }
896 :
897 600 : table_close(rel, RowExclusiveLock);
898 :
899 600 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
900 :
901 600 : if (wal_level != WAL_LEVEL_LOGICAL)
902 344 : ereport(WARNING,
903 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
904 : errmsg("\"wal_level\" is insufficient to publish logical changes"),
905 : errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
906 :
907 600 : return myself;
908 : }
909 :
910 : /*
911 : * Change options of a publication.
912 : */
913 : static void
914 116 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
915 : Relation rel, HeapTuple tup)
916 : {
917 : bool nulls[Natts_pg_publication];
918 : bool replaces[Natts_pg_publication];
919 : Datum values[Natts_pg_publication];
920 : bool publish_given;
921 : PublicationActions pubactions;
922 : bool publish_via_partition_root_given;
923 : bool publish_via_partition_root;
924 : bool publish_generated_columns_given;
925 : bool publish_generated_columns;
926 : ObjectAddress obj;
927 : Form_pg_publication pubform;
928 116 : List *root_relids = NIL;
929 : ListCell *lc;
930 :
931 116 : parse_publication_options(pstate,
932 : stmt->options,
933 : &publish_given, &pubactions,
934 : &publish_via_partition_root_given,
935 : &publish_via_partition_root,
936 : &publish_generated_columns_given,
937 : &publish_generated_columns);
938 :
939 116 : pubform = (Form_pg_publication) GETSTRUCT(tup);
940 :
941 : /*
942 : * If the publication doesn't publish changes via the root partitioned
943 : * table, the partition's row filter and column list will be used. So
944 : * disallow using WHERE clause and column lists on partitioned table in
945 : * this case.
946 : */
947 116 : if (!pubform->puballtables && publish_via_partition_root_given &&
948 80 : !publish_via_partition_root)
949 : {
950 : /*
951 : * Lock the publication so nobody else can do anything with it. This
952 : * prevents concurrent alter to add partitioned table(s) with WHERE
953 : * clause(s) and/or column lists which we don't allow when not
954 : * publishing via root.
955 : */
956 48 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
957 : AccessShareLock);
958 :
959 48 : root_relids = GetPublicationRelations(pubform->oid,
960 : PUBLICATION_PART_ROOT);
961 :
962 84 : foreach(lc, root_relids)
963 : {
964 48 : Oid relid = lfirst_oid(lc);
965 : HeapTuple rftuple;
966 : char relkind;
967 : char *relname;
968 : bool has_rowfilter;
969 : bool has_collist;
970 :
971 : /*
972 : * Beware: we don't have lock on the relations, so cope silently
973 : * with the cache lookups returning NULL.
974 : */
975 :
976 48 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
977 : ObjectIdGetDatum(relid),
978 : ObjectIdGetDatum(pubform->oid));
979 48 : if (!HeapTupleIsValid(rftuple))
980 0 : continue;
981 48 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
982 48 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
983 48 : if (!has_rowfilter && !has_collist)
984 : {
985 12 : ReleaseSysCache(rftuple);
986 12 : continue;
987 : }
988 :
989 36 : relkind = get_rel_relkind(relid);
990 36 : if (relkind != RELKIND_PARTITIONED_TABLE)
991 : {
992 24 : ReleaseSysCache(rftuple);
993 24 : continue;
994 : }
995 12 : relname = get_rel_name(relid);
996 12 : if (relname == NULL) /* table concurrently dropped */
997 : {
998 0 : ReleaseSysCache(rftuple);
999 0 : continue;
1000 : }
1001 :
1002 12 : if (has_rowfilter)
1003 6 : ereport(ERROR,
1004 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1005 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1006 : "publish_via_partition_root",
1007 : stmt->pubname),
1008 : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1009 : relname, "publish_via_partition_root")));
1010 : Assert(has_collist);
1011 6 : ereport(ERROR,
1012 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1013 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1014 : "publish_via_partition_root",
1015 : stmt->pubname),
1016 : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1017 : relname, "publish_via_partition_root")));
1018 : }
1019 : }
1020 :
1021 : /* Everything ok, form a new tuple. */
1022 104 : memset(values, 0, sizeof(values));
1023 104 : memset(nulls, false, sizeof(nulls));
1024 104 : memset(replaces, false, sizeof(replaces));
1025 :
1026 104 : if (publish_given)
1027 : {
1028 28 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1029 28 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1030 :
1031 28 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1032 28 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1033 :
1034 28 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1035 28 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1036 :
1037 28 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1038 28 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1039 : }
1040 :
1041 104 : if (publish_via_partition_root_given)
1042 : {
1043 70 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1044 70 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1045 : }
1046 :
1047 104 : if (publish_generated_columns_given)
1048 : {
1049 6 : values[Anum_pg_publication_pubgencols - 1] = BoolGetDatum(publish_generated_columns);
1050 6 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1051 : }
1052 :
1053 104 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1054 : replaces);
1055 :
1056 : /* Update the catalog. */
1057 104 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1058 :
1059 104 : CommandCounterIncrement();
1060 :
1061 104 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1062 :
1063 : /* Invalidate the relcache. */
1064 104 : if (pubform->puballtables)
1065 : {
1066 8 : CacheInvalidateRelcacheAll();
1067 : }
1068 : else
1069 : {
1070 96 : List *relids = NIL;
1071 96 : List *schemarelids = NIL;
1072 :
1073 : /*
1074 : * For any partitioned tables contained in the publication, we must
1075 : * invalidate all partitions contained in the respective partition
1076 : * trees, not just those explicitly mentioned in the publication.
1077 : */
1078 96 : if (root_relids == NIL)
1079 60 : relids = GetPublicationRelations(pubform->oid,
1080 : PUBLICATION_PART_ALL);
1081 : else
1082 : {
1083 : /*
1084 : * We already got tables explicitly mentioned in the publication.
1085 : * Now get all partitions for the partitioned table in the list.
1086 : */
1087 72 : foreach(lc, root_relids)
1088 36 : relids = GetPubPartitionOptionRelations(relids,
1089 : PUBLICATION_PART_ALL,
1090 : lfirst_oid(lc));
1091 : }
1092 :
1093 96 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1094 : PUBLICATION_PART_ALL);
1095 96 : relids = list_concat_unique_oid(relids, schemarelids);
1096 :
1097 96 : InvalidatePublicationRels(relids);
1098 : }
1099 :
1100 104 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1101 104 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1102 : (Node *) stmt);
1103 :
1104 104 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1105 104 : }
1106 :
1107 : /*
1108 : * Invalidate the relations.
1109 : */
1110 : void
1111 2282 : InvalidatePublicationRels(List *relids)
1112 : {
1113 : /*
1114 : * We don't want to send too many individual messages, at some point it's
1115 : * cheaper to just reset whole relcache.
1116 : */
1117 2282 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1118 : {
1119 : ListCell *lc;
1120 :
1121 19096 : foreach(lc, relids)
1122 16814 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1123 : }
1124 : else
1125 0 : CacheInvalidateRelcacheAll();
1126 2282 : }
1127 :
1128 : /*
1129 : * Add or remove table to/from publication.
1130 : */
1131 : static void
1132 884 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1133 : List *tables, const char *queryString,
1134 : bool publish_schema)
1135 : {
1136 884 : List *rels = NIL;
1137 884 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1138 884 : Oid pubid = pubform->oid;
1139 :
1140 : /*
1141 : * Nothing to do if no objects, except in SET: for that it is quite
1142 : * possible that user has not specified any tables in which case we need
1143 : * to remove all the existing tables.
1144 : */
1145 884 : if (!tables && stmt->action != AP_SetObjects)
1146 66 : return;
1147 :
1148 818 : rels = OpenTableList(tables);
1149 :
1150 818 : if (stmt->action == AP_AddObjects)
1151 : {
1152 274 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1153 :
1154 256 : publish_schema |= is_schema_publication(pubid);
1155 :
1156 256 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1157 256 : pubform->pubviaroot);
1158 :
1159 244 : PublicationAddTables(pubid, rels, false, stmt);
1160 : }
1161 544 : else if (stmt->action == AP_DropObjects)
1162 98 : PublicationDropTables(pubid, rels, false);
1163 : else /* AP_SetObjects */
1164 : {
1165 446 : List *oldrelids = GetPublicationRelations(pubid,
1166 : PUBLICATION_PART_ROOT);
1167 446 : List *delrels = NIL;
1168 : ListCell *oldlc;
1169 :
1170 446 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1171 :
1172 428 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1173 428 : pubform->pubviaroot);
1174 :
1175 : /*
1176 : * To recreate the relation list for the publication, look for
1177 : * existing relations that do not need to be dropped.
1178 : */
1179 806 : foreach(oldlc, oldrelids)
1180 : {
1181 402 : Oid oldrelid = lfirst_oid(oldlc);
1182 : ListCell *newlc;
1183 : PublicationRelInfo *oldrel;
1184 402 : bool found = false;
1185 : HeapTuple rftuple;
1186 402 : Node *oldrelwhereclause = NULL;
1187 402 : Bitmapset *oldcolumns = NULL;
1188 :
1189 : /* look up the cache for the old relmap */
1190 402 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1191 : ObjectIdGetDatum(oldrelid),
1192 : ObjectIdGetDatum(pubid));
1193 :
1194 : /*
1195 : * See if the existing relation currently has a WHERE clause or a
1196 : * column list. We need to compare those too.
1197 : */
1198 402 : if (HeapTupleIsValid(rftuple))
1199 : {
1200 402 : bool isnull = true;
1201 : Datum whereClauseDatum;
1202 : Datum columnListDatum;
1203 :
1204 : /* Load the WHERE clause for this table. */
1205 402 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1206 : Anum_pg_publication_rel_prqual,
1207 : &isnull);
1208 402 : if (!isnull)
1209 210 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1210 :
1211 : /* Transform the int2vector column list to a bitmap. */
1212 402 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1213 : Anum_pg_publication_rel_prattrs,
1214 : &isnull);
1215 :
1216 402 : if (!isnull)
1217 136 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1218 :
1219 402 : ReleaseSysCache(rftuple);
1220 : }
1221 :
1222 778 : foreach(newlc, rels)
1223 : {
1224 : PublicationRelInfo *newpubrel;
1225 : Oid newrelid;
1226 404 : Bitmapset *newcolumns = NULL;
1227 :
1228 404 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1229 404 : newrelid = RelationGetRelid(newpubrel->relation);
1230 :
1231 : /*
1232 : * Validate the column list. If the column list or WHERE
1233 : * clause changes, then the validation done here will be
1234 : * duplicated inside PublicationAddTables(). The validation
1235 : * is cheap enough that that seems harmless.
1236 : */
1237 404 : newcolumns = pub_collist_validate(newpubrel->relation,
1238 : newpubrel->columns);
1239 :
1240 : /*
1241 : * Check if any of the new set of relations matches with the
1242 : * existing relations in the publication. Additionally, if the
1243 : * relation has an associated WHERE clause, check the WHERE
1244 : * expressions also match. Same for the column list. Drop the
1245 : * rest.
1246 : */
1247 392 : if (newrelid == oldrelid)
1248 : {
1249 284 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1250 80 : bms_equal(oldcolumns, newcolumns))
1251 : {
1252 16 : found = true;
1253 16 : break;
1254 : }
1255 : }
1256 : }
1257 :
1258 : /*
1259 : * Add the non-matched relations to a list so that they can be
1260 : * dropped.
1261 : */
1262 390 : if (!found)
1263 : {
1264 374 : oldrel = palloc(sizeof(PublicationRelInfo));
1265 374 : oldrel->whereClause = NULL;
1266 374 : oldrel->columns = NIL;
1267 374 : oldrel->relation = table_open(oldrelid,
1268 : ShareUpdateExclusiveLock);
1269 374 : delrels = lappend(delrels, oldrel);
1270 : }
1271 : }
1272 :
1273 : /* And drop them. */
1274 404 : PublicationDropTables(pubid, delrels, true);
1275 :
1276 : /*
1277 : * Don't bother calculating the difference for adding, we'll catch and
1278 : * skip existing ones when doing catalog update.
1279 : */
1280 404 : PublicationAddTables(pubid, rels, true, stmt);
1281 :
1282 404 : CloseTableList(delrels);
1283 : }
1284 :
1285 692 : CloseTableList(rels);
1286 : }
1287 :
1288 : /*
1289 : * Alter the publication schemas.
1290 : *
1291 : * Add or remove schemas to/from publication.
1292 : */
1293 : static void
1294 758 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1295 : HeapTuple tup, List *schemaidlist)
1296 : {
1297 758 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1298 :
1299 : /*
1300 : * Nothing to do if no objects, except in SET: for that it is quite
1301 : * possible that user has not specified any schemas in which case we need
1302 : * to remove all the existing schemas.
1303 : */
1304 758 : if (!schemaidlist && stmt->action != AP_SetObjects)
1305 288 : return;
1306 :
1307 : /*
1308 : * Schema lock is held until the publication is altered to prevent
1309 : * concurrent schema deletion.
1310 : */
1311 470 : LockSchemaList(schemaidlist);
1312 470 : if (stmt->action == AP_AddObjects)
1313 : {
1314 : ListCell *lc;
1315 : List *reloids;
1316 :
1317 28 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1318 :
1319 36 : foreach(lc, reloids)
1320 : {
1321 : HeapTuple coltuple;
1322 :
1323 14 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1324 : ObjectIdGetDatum(lfirst_oid(lc)),
1325 : ObjectIdGetDatum(pubform->oid));
1326 :
1327 14 : if (!HeapTupleIsValid(coltuple))
1328 0 : continue;
1329 :
1330 : /*
1331 : * Disallow adding schema if column list is already part of the
1332 : * publication. See CheckPubRelationColumnList.
1333 : */
1334 14 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1335 6 : ereport(ERROR,
1336 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1337 : errmsg("cannot add schema to publication \"%s\"",
1338 : stmt->pubname),
1339 : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1340 :
1341 8 : ReleaseSysCache(coltuple);
1342 : }
1343 :
1344 22 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1345 : }
1346 442 : else if (stmt->action == AP_DropObjects)
1347 38 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1348 : else /* AP_SetObjects */
1349 : {
1350 404 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1351 404 : List *delschemas = NIL;
1352 :
1353 : /* Identify which schemas should be dropped */
1354 404 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1355 :
1356 : /*
1357 : * Schema lock is held until the publication is altered to prevent
1358 : * concurrent schema deletion.
1359 : */
1360 404 : LockSchemaList(delschemas);
1361 :
1362 : /* And drop them */
1363 404 : PublicationDropSchemas(pubform->oid, delschemas, true);
1364 :
1365 : /*
1366 : * Don't bother calculating the difference for adding, we'll catch and
1367 : * skip existing ones when doing catalog update.
1368 : */
1369 404 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1370 : }
1371 : }
1372 :
1373 : /*
1374 : * Check if relations and schemas can be in a given publication and throw
1375 : * appropriate error if not.
1376 : */
1377 : static void
1378 926 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1379 : List *tables, List *schemaidlist)
1380 : {
1381 926 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1382 :
1383 926 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1384 94 : schemaidlist && !superuser())
1385 6 : ereport(ERROR,
1386 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1387 : errmsg("must be superuser to add or set schemas")));
1388 :
1389 : /*
1390 : * Check that user is allowed to manipulate the publication tables in
1391 : * schema
1392 : */
1393 920 : if (schemaidlist && pubform->puballtables)
1394 18 : ereport(ERROR,
1395 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1396 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1397 : NameStr(pubform->pubname)),
1398 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1399 :
1400 : /* Check that user is allowed to manipulate the publication tables. */
1401 902 : if (tables && pubform->puballtables)
1402 18 : ereport(ERROR,
1403 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1404 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1405 : NameStr(pubform->pubname)),
1406 : errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1407 884 : }
1408 :
1409 : /*
1410 : * Alter the existing publication.
1411 : *
1412 : * This is dispatcher function for AlterPublicationOptions,
1413 : * AlterPublicationSchemas and AlterPublicationTables.
1414 : */
1415 : void
1416 1060 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1417 : {
1418 : Relation rel;
1419 : HeapTuple tup;
1420 : Form_pg_publication pubform;
1421 :
1422 1060 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1423 :
1424 1060 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1425 : CStringGetDatum(stmt->pubname));
1426 :
1427 1060 : if (!HeapTupleIsValid(tup))
1428 0 : ereport(ERROR,
1429 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1430 : errmsg("publication \"%s\" does not exist",
1431 : stmt->pubname)));
1432 :
1433 1060 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1434 :
1435 : /* must be owner */
1436 1060 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1437 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1438 0 : stmt->pubname);
1439 :
1440 1060 : if (stmt->options)
1441 116 : AlterPublicationOptions(pstate, stmt, rel, tup);
1442 : else
1443 : {
1444 944 : List *relations = NIL;
1445 944 : List *schemaidlist = NIL;
1446 944 : Oid pubid = pubform->oid;
1447 :
1448 944 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1449 : &schemaidlist);
1450 :
1451 926 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1452 :
1453 884 : heap_freetuple(tup);
1454 :
1455 : /* Lock the publication so nobody else can do anything with it. */
1456 884 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1457 : AccessExclusiveLock);
1458 :
1459 : /*
1460 : * It is possible that by the time we acquire the lock on publication,
1461 : * concurrent DDL has removed it. We can test this by checking the
1462 : * existence of publication. We get the tuple again to avoid the risk
1463 : * of any publication option getting changed.
1464 : */
1465 884 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1466 884 : if (!HeapTupleIsValid(tup))
1467 0 : ereport(ERROR,
1468 : errcode(ERRCODE_UNDEFINED_OBJECT),
1469 : errmsg("publication \"%s\" does not exist",
1470 : stmt->pubname));
1471 :
1472 884 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1473 : schemaidlist != NIL);
1474 758 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1475 : }
1476 :
1477 : /* Cleanup. */
1478 844 : heap_freetuple(tup);
1479 844 : table_close(rel, RowExclusiveLock);
1480 844 : }
1481 :
1482 : /*
1483 : * Remove relation from publication by mapping OID.
1484 : */
1485 : void
1486 810 : RemovePublicationRelById(Oid proid)
1487 : {
1488 : Relation rel;
1489 : HeapTuple tup;
1490 : Form_pg_publication_rel pubrel;
1491 810 : List *relids = NIL;
1492 :
1493 810 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1494 :
1495 810 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1496 :
1497 810 : if (!HeapTupleIsValid(tup))
1498 0 : elog(ERROR, "cache lookup failed for publication table %u",
1499 : proid);
1500 :
1501 810 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1502 :
1503 : /*
1504 : * Invalidate relcache so that publication info is rebuilt.
1505 : *
1506 : * For the partitioned tables, we must invalidate all partitions contained
1507 : * in the respective partition hierarchies, not just the one explicitly
1508 : * mentioned in the publication. This is required because we implicitly
1509 : * publish the child tables when the parent table is published.
1510 : */
1511 810 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1512 : pubrel->prrelid);
1513 :
1514 810 : InvalidatePublicationRels(relids);
1515 :
1516 810 : CatalogTupleDelete(rel, &tup->t_self);
1517 :
1518 810 : ReleaseSysCache(tup);
1519 :
1520 810 : table_close(rel, RowExclusiveLock);
1521 810 : }
1522 :
1523 : /*
1524 : * Remove the publication by mapping OID.
1525 : */
1526 : void
1527 424 : RemovePublicationById(Oid pubid)
1528 : {
1529 : Relation rel;
1530 : HeapTuple tup;
1531 : Form_pg_publication pubform;
1532 :
1533 424 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1534 :
1535 424 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1536 424 : if (!HeapTupleIsValid(tup))
1537 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1538 :
1539 424 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1540 :
1541 : /* Invalidate relcache so that publication info is rebuilt. */
1542 424 : if (pubform->puballtables)
1543 46 : CacheInvalidateRelcacheAll();
1544 :
1545 424 : CatalogTupleDelete(rel, &tup->t_self);
1546 :
1547 424 : ReleaseSysCache(tup);
1548 :
1549 424 : table_close(rel, RowExclusiveLock);
1550 424 : }
1551 :
1552 : /*
1553 : * Remove schema from publication by mapping OID.
1554 : */
1555 : void
1556 192 : RemovePublicationSchemaById(Oid psoid)
1557 : {
1558 : Relation rel;
1559 : HeapTuple tup;
1560 192 : List *schemaRels = NIL;
1561 : Form_pg_publication_namespace pubsch;
1562 :
1563 192 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1564 :
1565 192 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1566 :
1567 192 : if (!HeapTupleIsValid(tup))
1568 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1569 :
1570 192 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1571 :
1572 : /*
1573 : * Invalidate relcache so that publication info is rebuilt. See
1574 : * RemovePublicationRelById for why we need to consider all the
1575 : * partitions.
1576 : */
1577 192 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1578 : PUBLICATION_PART_ALL);
1579 192 : InvalidatePublicationRels(schemaRels);
1580 :
1581 192 : CatalogTupleDelete(rel, &tup->t_self);
1582 :
1583 192 : ReleaseSysCache(tup);
1584 :
1585 192 : table_close(rel, RowExclusiveLock);
1586 192 : }
1587 :
1588 : /*
1589 : * Open relations specified by a PublicationTable list.
1590 : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1591 : * add them to a publication.
1592 : */
1593 : static List *
1594 1228 : OpenTableList(List *tables)
1595 : {
1596 1228 : List *relids = NIL;
1597 1228 : List *rels = NIL;
1598 : ListCell *lc;
1599 1228 : List *relids_with_rf = NIL;
1600 1228 : List *relids_with_collist = NIL;
1601 :
1602 : /*
1603 : * Open, share-lock, and check all the explicitly-specified relations
1604 : */
1605 2514 : foreach(lc, tables)
1606 : {
1607 1310 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1608 1310 : bool recurse = t->relation->inh;
1609 : Relation rel;
1610 : Oid myrelid;
1611 : PublicationRelInfo *pub_rel;
1612 :
1613 : /* Allow query cancel in case this takes a long time */
1614 1310 : CHECK_FOR_INTERRUPTS();
1615 :
1616 1310 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1617 1310 : myrelid = RelationGetRelid(rel);
1618 :
1619 : /*
1620 : * Filter out duplicates if user specifies "foo, foo".
1621 : *
1622 : * Note that this algorithm is known to not be very efficient (O(N^2))
1623 : * but given that it only works on list of tables given to us by user
1624 : * it's deemed acceptable.
1625 : */
1626 1310 : if (list_member_oid(relids, myrelid))
1627 : {
1628 : /* Disallow duplicate tables if there are any with row filters. */
1629 24 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1630 12 : ereport(ERROR,
1631 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1632 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1633 : RelationGetRelationName(rel))));
1634 :
1635 : /* Disallow duplicate tables if there are any with column lists. */
1636 12 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1637 12 : ereport(ERROR,
1638 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1639 : errmsg("conflicting or redundant column lists for table \"%s\"",
1640 : RelationGetRelationName(rel))));
1641 :
1642 0 : table_close(rel, ShareUpdateExclusiveLock);
1643 0 : continue;
1644 : }
1645 :
1646 1286 : pub_rel = palloc(sizeof(PublicationRelInfo));
1647 1286 : pub_rel->relation = rel;
1648 1286 : pub_rel->whereClause = t->whereClause;
1649 1286 : pub_rel->columns = t->columns;
1650 1286 : rels = lappend(rels, pub_rel);
1651 1286 : relids = lappend_oid(relids, myrelid);
1652 :
1653 1286 : if (t->whereClause)
1654 372 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1655 :
1656 1286 : if (t->columns)
1657 380 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1658 :
1659 : /*
1660 : * Add children of this rel, if requested, so that they too are added
1661 : * to the publication. A partitioned table can't have any inheritance
1662 : * children other than its partitions, which need not be explicitly
1663 : * added to the publication.
1664 : */
1665 1286 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1666 : {
1667 : List *children;
1668 : ListCell *child;
1669 :
1670 1104 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1671 : NULL);
1672 :
1673 2216 : foreach(child, children)
1674 : {
1675 1112 : Oid childrelid = lfirst_oid(child);
1676 :
1677 : /* Allow query cancel in case this takes a long time */
1678 1112 : CHECK_FOR_INTERRUPTS();
1679 :
1680 : /*
1681 : * Skip duplicates if user specified both parent and child
1682 : * tables.
1683 : */
1684 1112 : if (list_member_oid(relids, childrelid))
1685 : {
1686 : /*
1687 : * We don't allow to specify row filter for both parent
1688 : * and child table at the same time as it is not very
1689 : * clear which one should be given preference.
1690 : */
1691 1104 : if (childrelid != myrelid &&
1692 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1693 0 : ereport(ERROR,
1694 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1695 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1696 : RelationGetRelationName(rel))));
1697 :
1698 : /*
1699 : * We don't allow to specify column list for both parent
1700 : * and child table at the same time as it is not very
1701 : * clear which one should be given preference.
1702 : */
1703 1104 : if (childrelid != myrelid &&
1704 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1705 0 : ereport(ERROR,
1706 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1707 : errmsg("conflicting or redundant column lists for table \"%s\"",
1708 : RelationGetRelationName(rel))));
1709 :
1710 1104 : continue;
1711 : }
1712 :
1713 : /* find_all_inheritors already got lock */
1714 8 : rel = table_open(childrelid, NoLock);
1715 8 : pub_rel = palloc(sizeof(PublicationRelInfo));
1716 8 : pub_rel->relation = rel;
1717 : /* child inherits WHERE clause from parent */
1718 8 : pub_rel->whereClause = t->whereClause;
1719 :
1720 : /* child inherits column list from parent */
1721 8 : pub_rel->columns = t->columns;
1722 8 : rels = lappend(rels, pub_rel);
1723 8 : relids = lappend_oid(relids, childrelid);
1724 :
1725 8 : if (t->whereClause)
1726 2 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1727 :
1728 8 : if (t->columns)
1729 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1730 : }
1731 : }
1732 : }
1733 :
1734 1204 : list_free(relids);
1735 1204 : list_free(relids_with_rf);
1736 :
1737 1204 : return rels;
1738 : }
1739 :
1740 : /*
1741 : * Close all relations in the list.
1742 : */
1743 : static void
1744 1426 : CloseTableList(List *rels)
1745 : {
1746 : ListCell *lc;
1747 :
1748 2888 : foreach(lc, rels)
1749 : {
1750 : PublicationRelInfo *pub_rel;
1751 :
1752 1462 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1753 1462 : table_close(pub_rel->relation, NoLock);
1754 : }
1755 :
1756 1426 : list_free_deep(rels);
1757 1426 : }
1758 :
1759 : /*
1760 : * Lock the schemas specified in the schema list in AccessShareLock mode in
1761 : * order to prevent concurrent schema deletion.
1762 : */
1763 : static void
1764 1008 : LockSchemaList(List *schemalist)
1765 : {
1766 : ListCell *lc;
1767 :
1768 1292 : foreach(lc, schemalist)
1769 : {
1770 284 : Oid schemaid = lfirst_oid(lc);
1771 :
1772 : /* Allow query cancel in case this takes a long time */
1773 284 : CHECK_FOR_INTERRUPTS();
1774 284 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1775 :
1776 : /*
1777 : * It is possible that by the time we acquire the lock on schema,
1778 : * concurrent DDL has removed it. We can test this by checking the
1779 : * existence of schema.
1780 : */
1781 284 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1782 0 : ereport(ERROR,
1783 : errcode(ERRCODE_UNDEFINED_SCHEMA),
1784 : errmsg("schema with OID %u does not exist", schemaid));
1785 : }
1786 1008 : }
1787 :
1788 : /*
1789 : * Add listed tables to the publication.
1790 : */
1791 : static void
1792 1004 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1793 : AlterPublicationStmt *stmt)
1794 : {
1795 : ListCell *lc;
1796 :
1797 : Assert(!stmt || !stmt->for_all_tables);
1798 :
1799 2006 : foreach(lc, rels)
1800 : {
1801 1064 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1802 1064 : Relation rel = pub_rel->relation;
1803 : ObjectAddress obj;
1804 :
1805 : /* Must be owner of the table or superuser. */
1806 1064 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1807 6 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1808 6 : RelationGetRelationName(rel));
1809 :
1810 1058 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1811 1002 : if (stmt)
1812 : {
1813 612 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1814 : (Node *) stmt);
1815 :
1816 612 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1817 : obj.objectId, 0);
1818 : }
1819 : }
1820 942 : }
1821 :
1822 : /*
1823 : * Remove listed tables from the publication.
1824 : */
1825 : static void
1826 502 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1827 : {
1828 : ObjectAddress obj;
1829 : ListCell *lc;
1830 : Oid prid;
1831 :
1832 962 : foreach(lc, rels)
1833 : {
1834 478 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1835 478 : Relation rel = pubrel->relation;
1836 478 : Oid relid = RelationGetRelid(rel);
1837 :
1838 478 : if (pubrel->columns)
1839 0 : ereport(ERROR,
1840 : errcode(ERRCODE_SYNTAX_ERROR),
1841 : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1842 :
1843 478 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1844 : ObjectIdGetDatum(relid),
1845 : ObjectIdGetDatum(pubid));
1846 478 : if (!OidIsValid(prid))
1847 : {
1848 12 : if (missing_ok)
1849 0 : continue;
1850 :
1851 12 : ereport(ERROR,
1852 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1853 : errmsg("relation \"%s\" is not part of the publication",
1854 : RelationGetRelationName(rel))));
1855 : }
1856 :
1857 466 : if (pubrel->whereClause)
1858 6 : ereport(ERROR,
1859 : (errcode(ERRCODE_SYNTAX_ERROR),
1860 : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1861 :
1862 460 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1863 460 : performDeletion(&obj, DROP_CASCADE, 0);
1864 : }
1865 484 : }
1866 :
1867 : /*
1868 : * Add listed schemas to the publication.
1869 : */
1870 : static void
1871 560 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1872 : AlterPublicationStmt *stmt)
1873 : {
1874 : ListCell *lc;
1875 :
1876 : Assert(!stmt || !stmt->for_all_tables);
1877 :
1878 770 : foreach(lc, schemas)
1879 : {
1880 222 : Oid schemaid = lfirst_oid(lc);
1881 : ObjectAddress obj;
1882 :
1883 222 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
1884 210 : if (stmt)
1885 : {
1886 58 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1887 : (Node *) stmt);
1888 :
1889 58 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1890 : obj.objectId, 0);
1891 : }
1892 : }
1893 548 : }
1894 :
1895 : /*
1896 : * Remove listed schemas from the publication.
1897 : */
1898 : static void
1899 442 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1900 : {
1901 : ObjectAddress obj;
1902 : ListCell *lc;
1903 : Oid psid;
1904 :
1905 492 : foreach(lc, schemas)
1906 : {
1907 56 : Oid schemaid = lfirst_oid(lc);
1908 :
1909 56 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1910 : Anum_pg_publication_namespace_oid,
1911 : ObjectIdGetDatum(schemaid),
1912 : ObjectIdGetDatum(pubid));
1913 56 : if (!OidIsValid(psid))
1914 : {
1915 6 : if (missing_ok)
1916 0 : continue;
1917 :
1918 6 : ereport(ERROR,
1919 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1920 : errmsg("tables from schema \"%s\" are not part of the publication",
1921 : get_namespace_name(schemaid))));
1922 : }
1923 :
1924 50 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1925 50 : performDeletion(&obj, DROP_CASCADE, 0);
1926 : }
1927 436 : }
1928 :
1929 : /*
1930 : * Internal workhorse for changing a publication owner
1931 : */
1932 : static void
1933 26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1934 : {
1935 : Form_pg_publication form;
1936 :
1937 26 : form = (Form_pg_publication) GETSTRUCT(tup);
1938 :
1939 26 : if (form->pubowner == newOwnerId)
1940 2 : return;
1941 :
1942 24 : if (!superuser())
1943 : {
1944 : AclResult aclresult;
1945 :
1946 : /* Must be owner */
1947 12 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1948 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1949 0 : NameStr(form->pubname));
1950 :
1951 : /* Must be able to become new owner */
1952 12 : check_can_set_role(GetUserId(), newOwnerId);
1953 :
1954 : /* New owner must have CREATE privilege on database */
1955 12 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
1956 12 : if (aclresult != ACLCHECK_OK)
1957 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
1958 0 : get_database_name(MyDatabaseId));
1959 :
1960 12 : if (form->puballtables && !superuser_arg(newOwnerId))
1961 0 : ereport(ERROR,
1962 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1963 : errmsg("permission denied to change owner of publication \"%s\"",
1964 : NameStr(form->pubname)),
1965 : errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1966 :
1967 12 : if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1968 6 : ereport(ERROR,
1969 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1970 : errmsg("permission denied to change owner of publication \"%s\"",
1971 : NameStr(form->pubname)),
1972 : errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1973 : }
1974 :
1975 18 : form->pubowner = newOwnerId;
1976 18 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1977 :
1978 : /* Update owner dependency reference */
1979 18 : changeDependencyOnOwner(PublicationRelationId,
1980 : form->oid,
1981 : newOwnerId);
1982 :
1983 18 : InvokeObjectPostAlterHook(PublicationRelationId,
1984 : form->oid, 0);
1985 : }
1986 :
1987 : /*
1988 : * Change publication owner -- by name
1989 : */
1990 : ObjectAddress
1991 26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
1992 : {
1993 : Oid subid;
1994 : HeapTuple tup;
1995 : Relation rel;
1996 : ObjectAddress address;
1997 : Form_pg_publication pubform;
1998 :
1999 26 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2000 :
2001 26 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2002 :
2003 26 : if (!HeapTupleIsValid(tup))
2004 0 : ereport(ERROR,
2005 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2006 : errmsg("publication \"%s\" does not exist", name)));
2007 :
2008 26 : pubform = (Form_pg_publication) GETSTRUCT(tup);
2009 26 : subid = pubform->oid;
2010 :
2011 26 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2012 :
2013 20 : ObjectAddressSet(address, PublicationRelationId, subid);
2014 :
2015 20 : heap_freetuple(tup);
2016 :
2017 20 : table_close(rel, RowExclusiveLock);
2018 :
2019 20 : return address;
2020 : }
2021 :
2022 : /*
2023 : * Change publication owner -- by OID
2024 : */
2025 : void
2026 0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
2027 : {
2028 : HeapTuple tup;
2029 : Relation rel;
2030 :
2031 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2032 :
2033 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
2034 :
2035 0 : if (!HeapTupleIsValid(tup))
2036 0 : ereport(ERROR,
2037 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2038 : errmsg("publication with OID %u does not exist", subid)));
2039 :
2040 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2041 :
2042 0 : heap_freetuple(tup);
2043 :
2044 0 : table_close(rel, RowExclusiveLock);
2045 0 : }
|