Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/namespace.h"
23 : #include "catalog/objectaccess.h"
24 : #include "catalog/objectaddress.h"
25 : #include "catalog/pg_database.h"
26 : #include "catalog/pg_inherits.h"
27 : #include "catalog/pg_namespace.h"
28 : #include "catalog/pg_proc.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "commands/dbcommands.h"
33 : #include "commands/defrem.h"
34 : #include "commands/event_trigger.h"
35 : #include "commands/publicationcmds.h"
36 : #include "miscadmin.h"
37 : #include "nodes/nodeFuncs.h"
38 : #include "parser/parse_clause.h"
39 : #include "parser/parse_collate.h"
40 : #include "parser/parse_relation.h"
41 : #include "storage/lmgr.h"
42 : #include "utils/acl.h"
43 : #include "utils/builtins.h"
44 : #include "utils/inval.h"
45 : #include "utils/lsyscache.h"
46 : #include "utils/rel.h"
47 : #include "utils/syscache.h"
48 : #include "utils/varlena.h"
49 :
50 :
51 : /*
52 : * Information used to validate the columns in the row filter expression. See
53 : * contain_invalid_rfcolumn_walker for details.
54 : */
55 : typedef struct rf_context
56 : {
57 : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : bool pubviaroot; /* true if we are validating the parent
59 : * relation's row filter */
60 : Oid relid; /* relid of the relation */
61 : Oid parentid; /* relid of the parent relation */
62 : } rf_context;
63 :
64 : static List *OpenTableList(List *tables);
65 : static void CloseTableList(List *rels);
66 : static void LockSchemaList(List *schemalist);
67 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : AlterPublicationStmt *stmt);
69 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : AlterPublicationStmt *stmt);
72 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 :
74 :
75 : static void
76 784 : parse_publication_options(ParseState *pstate,
77 : List *options,
78 : bool *publish_given,
79 : PublicationActions *pubactions,
80 : bool *publish_via_partition_root_given,
81 : bool *publish_via_partition_root)
82 : {
83 : ListCell *lc;
84 :
85 784 : *publish_given = false;
86 784 : *publish_via_partition_root_given = false;
87 :
88 : /* defaults */
89 784 : pubactions->pubinsert = true;
90 784 : pubactions->pubupdate = true;
91 784 : pubactions->pubdelete = true;
92 784 : pubactions->pubtruncate = true;
93 784 : *publish_via_partition_root = false;
94 :
95 : /* Parse options */
96 1056 : foreach(lc, options)
97 : {
98 290 : DefElem *defel = (DefElem *) lfirst(lc);
99 :
100 290 : if (strcmp(defel->defname, "publish") == 0)
101 : {
102 : char *publish;
103 : List *publish_list;
104 : ListCell *lc2;
105 :
106 122 : if (*publish_given)
107 0 : errorConflictingDefElem(defel, pstate);
108 :
109 : /*
110 : * If publish option was given only the explicitly listed actions
111 : * should be published.
112 : */
113 122 : pubactions->pubinsert = false;
114 122 : pubactions->pubupdate = false;
115 122 : pubactions->pubdelete = false;
116 122 : pubactions->pubtruncate = false;
117 :
118 122 : *publish_given = true;
119 122 : publish = defGetString(defel);
120 :
121 122 : if (!SplitIdentifierString(publish, ',', &publish_list))
122 0 : ereport(ERROR,
123 : (errcode(ERRCODE_SYNTAX_ERROR),
124 : errmsg("invalid list syntax in parameter \"%s\"",
125 : "publish")));
126 :
127 : /* Process the option list. */
128 270 : foreach(lc2, publish_list)
129 : {
130 154 : char *publish_opt = (char *) lfirst(lc2);
131 :
132 154 : if (strcmp(publish_opt, "insert") == 0)
133 108 : pubactions->pubinsert = true;
134 46 : else if (strcmp(publish_opt, "update") == 0)
135 20 : pubactions->pubupdate = true;
136 26 : else if (strcmp(publish_opt, "delete") == 0)
137 10 : pubactions->pubdelete = true;
138 16 : else if (strcmp(publish_opt, "truncate") == 0)
139 10 : pubactions->pubtruncate = true;
140 : else
141 6 : ereport(ERROR,
142 : (errcode(ERRCODE_SYNTAX_ERROR),
143 : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
144 : "publish", publish_opt)));
145 : }
146 : }
147 168 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
148 : {
149 162 : if (*publish_via_partition_root_given)
150 6 : errorConflictingDefElem(defel, pstate);
151 156 : *publish_via_partition_root_given = true;
152 156 : *publish_via_partition_root = defGetBoolean(defel);
153 : }
154 : else
155 6 : ereport(ERROR,
156 : (errcode(ERRCODE_SYNTAX_ERROR),
157 : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
158 : }
159 766 : }
160 :
161 : /*
162 : * Convert the PublicationObjSpecType list into schema oid list and
163 : * PublicationTable list.
164 : */
165 : static void
166 1502 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
167 : List **rels, List **schemas)
168 : {
169 : ListCell *cell;
170 : PublicationObjSpec *pubobj;
171 :
172 1502 : if (!pubobjspec_list)
173 84 : return;
174 :
175 3000 : foreach(cell, pubobjspec_list)
176 : {
177 : Oid schemaid;
178 : List *search_path;
179 :
180 1618 : pubobj = (PublicationObjSpec *) lfirst(cell);
181 :
182 1618 : switch (pubobj->pubobjtype)
183 : {
184 1262 : case PUBLICATIONOBJ_TABLE:
185 1262 : *rels = lappend(*rels, pubobj->pubtable);
186 1262 : break;
187 332 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
188 332 : schemaid = get_namespace_oid(pubobj->name, false);
189 :
190 : /* Filter out duplicates if user specifies "sch1, sch1" */
191 302 : *schemas = list_append_unique_oid(*schemas, schemaid);
192 302 : break;
193 24 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
194 24 : search_path = fetch_search_path(false);
195 24 : if (search_path == NIL) /* nothing valid in search_path? */
196 6 : ereport(ERROR,
197 : errcode(ERRCODE_UNDEFINED_SCHEMA),
198 : errmsg("no schema has been selected for CURRENT_SCHEMA"));
199 :
200 18 : schemaid = linitial_oid(search_path);
201 18 : list_free(search_path);
202 :
203 : /* Filter out duplicates if user specifies "sch1, sch1" */
204 18 : *schemas = list_append_unique_oid(*schemas, schemaid);
205 18 : break;
206 0 : default:
207 : /* shouldn't happen */
208 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
209 : break;
210 : }
211 : }
212 : }
213 :
214 : /*
215 : * Returns true if any of the columns used in the row filter WHERE expression is
216 : * not part of REPLICA IDENTITY, false otherwise.
217 : */
218 : static bool
219 258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
220 : {
221 258 : if (node == NULL)
222 0 : return false;
223 :
224 258 : if (IsA(node, Var))
225 : {
226 104 : Var *var = (Var *) node;
227 104 : AttrNumber attnum = var->varattno;
228 :
229 : /*
230 : * If pubviaroot is true, we are validating the row filter of the
231 : * parent table, but the bitmap contains the replica identity
232 : * information of the child table. So, get the column number of the
233 : * child table as parent and child column order could be different.
234 : */
235 104 : if (context->pubviaroot)
236 : {
237 16 : char *colname = get_attname(context->parentid, attnum, false);
238 :
239 16 : attnum = get_attnum(context->relid, colname);
240 : }
241 :
242 104 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
243 104 : context->bms_replident))
244 60 : return true;
245 : }
246 :
247 198 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
248 : (void *) context);
249 : }
250 :
251 : /*
252 : * Check if all columns referenced in the filter expression are part of the
253 : * REPLICA IDENTITY index or not.
254 : *
255 : * Returns true if any invalid column is found.
256 : */
257 : bool
258 662 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
259 : bool pubviaroot)
260 : {
261 : HeapTuple rftuple;
262 662 : Oid relid = RelationGetRelid(relation);
263 662 : Oid publish_as_relid = RelationGetRelid(relation);
264 662 : bool result = false;
265 : Datum rfdatum;
266 : bool rfisnull;
267 :
268 : /*
269 : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
270 : * allowed in the row filter and we can skip the validation.
271 : */
272 662 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
273 156 : return false;
274 :
275 : /*
276 : * For a partition, if pubviaroot is true, find the topmost ancestor that
277 : * is published via this publication as we need to use its row filter
278 : * expression to filter the partition's changes.
279 : *
280 : * Note that even though the row filter used is for an ancestor, the
281 : * REPLICA IDENTITY used will be for the actual child table.
282 : */
283 506 : if (pubviaroot && relation->rd_rel->relispartition)
284 : {
285 : publish_as_relid
286 106 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
287 :
288 106 : if (!OidIsValid(publish_as_relid))
289 6 : publish_as_relid = relid;
290 : }
291 :
292 506 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
293 : ObjectIdGetDatum(publish_as_relid),
294 : ObjectIdGetDatum(pubid));
295 :
296 506 : if (!HeapTupleIsValid(rftuple))
297 56 : return false;
298 :
299 450 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
300 : Anum_pg_publication_rel_prqual,
301 : &rfisnull);
302 :
303 450 : if (!rfisnull)
304 : {
305 102 : rf_context context = {0};
306 : Node *rfnode;
307 102 : Bitmapset *bms = NULL;
308 :
309 102 : context.pubviaroot = pubviaroot;
310 102 : context.parentid = publish_as_relid;
311 102 : context.relid = relid;
312 :
313 : /* Remember columns that are part of the REPLICA IDENTITY */
314 102 : bms = RelationGetIndexAttrBitmap(relation,
315 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
316 :
317 102 : context.bms_replident = bms;
318 102 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
319 102 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
320 : }
321 :
322 450 : ReleaseSysCache(rftuple);
323 :
324 450 : return result;
325 : }
326 :
327 : /*
328 : * Check if all columns referenced in the REPLICA IDENTITY are covered by
329 : * the column list.
330 : *
331 : * Returns true if any replica identity column is not covered by column list.
332 : */
333 : bool
334 662 : pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
335 : bool pubviaroot)
336 : {
337 : HeapTuple tuple;
338 662 : Oid relid = RelationGetRelid(relation);
339 662 : Oid publish_as_relid = RelationGetRelid(relation);
340 662 : bool result = false;
341 : Datum datum;
342 : bool isnull;
343 :
344 : /*
345 : * For a partition, if pubviaroot is true, find the topmost ancestor that
346 : * is published via this publication as we need to use its column list for
347 : * the changes.
348 : *
349 : * Note that even though the column list used is for an ancestor, the
350 : * REPLICA IDENTITY used will be for the actual child table.
351 : */
352 662 : if (pubviaroot && relation->rd_rel->relispartition)
353 : {
354 128 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
355 :
356 128 : if (!OidIsValid(publish_as_relid))
357 6 : publish_as_relid = relid;
358 : }
359 :
360 662 : tuple = SearchSysCache2(PUBLICATIONRELMAP,
361 : ObjectIdGetDatum(publish_as_relid),
362 : ObjectIdGetDatum(pubid));
363 :
364 662 : if (!HeapTupleIsValid(tuple))
365 64 : return false;
366 :
367 598 : datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
368 : Anum_pg_publication_rel_prattrs,
369 : &isnull);
370 :
371 598 : if (!isnull)
372 : {
373 : int x;
374 : Bitmapset *idattrs;
375 230 : Bitmapset *columns = NULL;
376 :
377 : /* With REPLICA IDENTITY FULL, no column list is allowed. */
378 230 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
379 36 : result = true;
380 :
381 : /* Transform the column list datum to a bitmapset. */
382 230 : columns = pub_collist_to_bitmapset(NULL, datum, NULL);
383 :
384 : /* Remember columns that are part of the REPLICA IDENTITY */
385 230 : idattrs = RelationGetIndexAttrBitmap(relation,
386 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
387 :
388 : /*
389 : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
390 : * offset (to handle system columns the usual way), while column list
391 : * does not use offset, so we can't do bms_is_subset(). Instead, we
392 : * have to loop over the idattrs and check all of them are in the
393 : * list.
394 : */
395 230 : x = -1;
396 350 : while ((x = bms_next_member(idattrs, x)) >= 0)
397 : {
398 192 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
399 :
400 : /*
401 : * If pubviaroot is true, we are validating the column list of the
402 : * parent table, but the bitmap contains the replica identity
403 : * information of the child table. The parent/child attnums may
404 : * not match, so translate them to the parent - get the attname
405 : * from the child, and look it up in the parent.
406 : */
407 192 : if (pubviaroot)
408 : {
409 : /* attribute name in the child table */
410 80 : char *colname = get_attname(relid, attnum, false);
411 :
412 : /*
413 : * Determine the attnum for the attribute name in parent (we
414 : * are using the column list defined on the parent).
415 : */
416 80 : attnum = get_attnum(publish_as_relid, colname);
417 : }
418 :
419 : /* replica identity column, not covered by the column list */
420 192 : if (!bms_is_member(attnum, columns))
421 : {
422 72 : result = true;
423 72 : break;
424 : }
425 : }
426 :
427 230 : bms_free(idattrs);
428 230 : bms_free(columns);
429 : }
430 :
431 598 : ReleaseSysCache(tuple);
432 :
433 598 : return result;
434 : }
435 :
436 : /* check_functions_in_node callback */
437 : static bool
438 396 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
439 : {
440 396 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
441 : func_id >= FirstNormalObjectId);
442 : }
443 :
444 : /*
445 : * The row filter walker checks if the row filter expression is a "simple
446 : * expression".
447 : *
448 : * It allows only simple or compound expressions such as:
449 : * - (Var Op Const)
450 : * - (Var Op Var)
451 : * - (Var Op Const) AND/OR (Var Op Const)
452 : * - etc
453 : * (where Var is a column of the table this filter belongs to)
454 : *
455 : * The simple expression has the following restrictions:
456 : * - User-defined operators are not allowed;
457 : * - User-defined functions are not allowed;
458 : * - User-defined types are not allowed;
459 : * - User-defined collations are not allowed;
460 : * - Non-immutable built-in functions are not allowed;
461 : * - System columns are not allowed.
462 : *
463 : * NOTES
464 : *
465 : * We don't allow user-defined functions/operators/types/collations because
466 : * (a) if a user drops a user-defined object used in a row filter expression or
467 : * if there is any other error while using it, the logical decoding
468 : * infrastructure won't be able to recover from such an error even if the
469 : * object is recreated again because a historic snapshot is used to evaluate
470 : * the row filter;
471 : * (b) a user-defined function can be used to access tables that could have
472 : * unpleasant results because a historic snapshot is used. That's why only
473 : * immutable built-in functions are allowed in row filter expressions.
474 : *
475 : * We don't allow system columns because currently, we don't have that
476 : * information in the tuple passed to downstream. Also, as we don't replicate
477 : * those to subscribers, there doesn't seem to be a need for a filter on those
478 : * columns.
479 : *
480 : * We can allow other node types after more analysis and testing.
481 : */
482 : static bool
483 1328 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
484 : {
485 1328 : char *errdetail_msg = NULL;
486 :
487 1328 : if (node == NULL)
488 6 : return false;
489 :
490 1322 : switch (nodeTag(node))
491 : {
492 356 : case T_Var:
493 : /* System columns are not allowed. */
494 356 : if (((Var *) node)->varattno < InvalidAttrNumber)
495 6 : errdetail_msg = _("System columns are not allowed.");
496 356 : break;
497 352 : case T_OpExpr:
498 : case T_DistinctExpr:
499 : case T_NullIfExpr:
500 : /* OK, except user-defined operators are not allowed. */
501 352 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
502 6 : errdetail_msg = _("User-defined operators are not allowed.");
503 352 : break;
504 6 : case T_ScalarArrayOpExpr:
505 : /* OK, except user-defined operators are not allowed. */
506 6 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
507 0 : errdetail_msg = _("User-defined operators are not allowed.");
508 :
509 : /*
510 : * We don't need to check the hashfuncid and negfuncid of
511 : * ScalarArrayOpExpr as those functions are only built for a
512 : * subquery.
513 : */
514 6 : break;
515 6 : case T_RowCompareExpr:
516 : {
517 : ListCell *opid;
518 :
519 : /* OK, except user-defined operators are not allowed. */
520 18 : foreach(opid, ((RowCompareExpr *) node)->opnos)
521 : {
522 12 : if (lfirst_oid(opid) >= FirstNormalObjectId)
523 : {
524 0 : errdetail_msg = _("User-defined operators are not allowed.");
525 0 : break;
526 : }
527 : }
528 : }
529 6 : break;
530 596 : case T_Const:
531 : case T_FuncExpr:
532 : case T_BoolExpr:
533 : case T_RelabelType:
534 : case T_CollateExpr:
535 : case T_CaseExpr:
536 : case T_CaseTestExpr:
537 : case T_ArrayExpr:
538 : case T_RowExpr:
539 : case T_CoalesceExpr:
540 : case T_MinMaxExpr:
541 : case T_XmlExpr:
542 : case T_NullTest:
543 : case T_BooleanTest:
544 : case T_List:
545 : /* OK, supported */
546 596 : break;
547 6 : default:
548 6 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
549 6 : break;
550 : }
551 :
552 : /*
553 : * For all the supported nodes, if we haven't already found a problem,
554 : * check the types, functions, and collations used in it. We check List
555 : * by walking through each element.
556 : */
557 1322 : if (!errdetail_msg && !IsA(node, List))
558 : {
559 1250 : if (exprType(node) >= FirstNormalObjectId)
560 6 : errdetail_msg = _("User-defined types are not allowed.");
561 1244 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
562 : (void *) pstate))
563 12 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
564 2464 : else if (exprCollation(node) >= FirstNormalObjectId ||
565 1232 : exprInputCollation(node) >= FirstNormalObjectId)
566 6 : errdetail_msg = _("User-defined collations are not allowed.");
567 : }
568 :
569 : /*
570 : * If we found a problem in this node, throw error now. Otherwise keep
571 : * going.
572 : */
573 1322 : if (errdetail_msg)
574 42 : ereport(ERROR,
575 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
576 : errmsg("invalid publication WHERE expression"),
577 : errdetail_internal("%s", errdetail_msg),
578 : parser_errposition(pstate, exprLocation(node))));
579 :
580 1280 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
581 : (void *) pstate);
582 : }
583 :
584 : /*
585 : * Check if the row filter expression is a "simple expression".
586 : *
587 : * See check_simple_rowfilter_expr_walker for details.
588 : */
589 : static bool
590 344 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
591 : {
592 344 : return check_simple_rowfilter_expr_walker(node, pstate);
593 : }
594 :
595 : /*
596 : * Transform the publication WHERE expression for all the relations in the list,
597 : * ensuring it is coerced to boolean and necessary collation information is
598 : * added if required, and add a new nsitem/RTE for the associated relation to
599 : * the ParseState's namespace list.
600 : *
601 : * Also check the publication row filter expression and throw an error if
602 : * anything not permitted or unexpected is encountered.
603 : */
604 : static void
605 1044 : TransformPubWhereClauses(List *tables, const char *queryString,
606 : bool pubviaroot)
607 : {
608 : ListCell *lc;
609 :
610 2090 : foreach(lc, tables)
611 : {
612 : ParseNamespaceItem *nsitem;
613 1106 : Node *whereclause = NULL;
614 : ParseState *pstate;
615 1106 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
616 :
617 1106 : if (pri->whereClause == NULL)
618 744 : continue;
619 :
620 : /*
621 : * If the publication doesn't publish changes via the root partitioned
622 : * table, the partition's row filter will be used. So disallow using
623 : * WHERE clause on partitioned table in this case.
624 : */
625 362 : if (!pubviaroot &&
626 340 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
627 6 : ereport(ERROR,
628 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
629 : errmsg("cannot use publication WHERE clause for relation \"%s\"",
630 : RelationGetRelationName(pri->relation)),
631 : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
632 : "publish_via_partition_root")));
633 :
634 : /*
635 : * A fresh pstate is required so that we only have "this" table in its
636 : * rangetable
637 : */
638 356 : pstate = make_parsestate(NULL);
639 356 : pstate->p_sourcetext = queryString;
640 356 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
641 : AccessShareLock, NULL,
642 : false, false);
643 356 : addNSItemToQuery(pstate, nsitem, false, true, true);
644 :
645 356 : whereclause = transformWhereClause(pstate,
646 356 : copyObject(pri->whereClause),
647 : EXPR_KIND_WHERE,
648 : "PUBLICATION WHERE");
649 :
650 : /* Fix up collation information */
651 344 : assign_expr_collations(pstate, whereclause);
652 :
653 : /*
654 : * We allow only simple expressions in row filters. See
655 : * check_simple_rowfilter_expr_walker.
656 : */
657 344 : check_simple_rowfilter_expr(whereclause, pstate);
658 :
659 302 : free_parsestate(pstate);
660 :
661 302 : pri->whereClause = whereclause;
662 : }
663 984 : }
664 :
665 :
666 : /*
667 : * Given a list of tables that are going to be added to a publication,
668 : * verify that they fulfill the necessary preconditions, namely: no tables
669 : * have a column list if any schema is published; and partitioned tables do
670 : * not have column lists if publish_via_partition_root is not set.
671 : *
672 : * 'publish_schema' indicates that the publication contains any TABLES IN
673 : * SCHEMA elements (newly added in this command, or preexisting).
674 : * 'pubviaroot' is the value of publish_via_partition_root.
675 : */
676 : static void
677 984 : CheckPubRelationColumnList(char *pubname, List *tables,
678 : bool publish_schema, bool pubviaroot)
679 : {
680 : ListCell *lc;
681 :
682 2000 : foreach(lc, tables)
683 : {
684 1046 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
685 :
686 1046 : if (pri->columns == NIL)
687 720 : continue;
688 :
689 : /*
690 : * Disallow specifying column list if any schema is in the
691 : * publication.
692 : *
693 : * XXX We could instead just forbid the case when the publication
694 : * tries to publish the table with a column list and a schema for that
695 : * table. However, if we do that then we need a restriction during
696 : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
697 : * seem to be a good idea.
698 : */
699 326 : if (publish_schema)
700 24 : ereport(ERROR,
701 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
702 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
703 : get_namespace_name(RelationGetNamespace(pri->relation)),
704 : RelationGetRelationName(pri->relation), pubname),
705 : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
706 :
707 : /*
708 : * If the publication doesn't publish changes via the root partitioned
709 : * table, the partition's column list will be used. So disallow using
710 : * a column list on the partitioned table in this case.
711 : */
712 302 : if (!pubviaroot &&
713 226 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
714 6 : ereport(ERROR,
715 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
716 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
717 : get_namespace_name(RelationGetNamespace(pri->relation)),
718 : RelationGetRelationName(pri->relation), pubname),
719 : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
720 : "publish_via_partition_root")));
721 : }
722 954 : }
723 :
724 : /*
725 : * Create new publication.
726 : */
727 : ObjectAddress
728 686 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
729 : {
730 : Relation rel;
731 : ObjectAddress myself;
732 : Oid puboid;
733 : bool nulls[Natts_pg_publication];
734 : Datum values[Natts_pg_publication];
735 : HeapTuple tup;
736 : bool publish_given;
737 : PublicationActions pubactions;
738 : bool publish_via_partition_root_given;
739 : bool publish_via_partition_root;
740 : AclResult aclresult;
741 686 : List *relations = NIL;
742 686 : List *schemaidlist = NIL;
743 :
744 : /* must have CREATE privilege on database */
745 686 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
746 686 : if (aclresult != ACLCHECK_OK)
747 6 : aclcheck_error(aclresult, OBJECT_DATABASE,
748 6 : get_database_name(MyDatabaseId));
749 :
750 : /* FOR ALL TABLES requires superuser */
751 680 : if (stmt->for_all_tables && !superuser())
752 0 : ereport(ERROR,
753 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
754 : errmsg("must be superuser to create FOR ALL TABLES publication")));
755 :
756 680 : rel = table_open(PublicationRelationId, RowExclusiveLock);
757 :
758 : /* Check if name is used */
759 680 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
760 : CStringGetDatum(stmt->pubname));
761 680 : if (OidIsValid(puboid))
762 6 : ereport(ERROR,
763 : (errcode(ERRCODE_DUPLICATE_OBJECT),
764 : errmsg("publication \"%s\" already exists",
765 : stmt->pubname)));
766 :
767 : /* Form a tuple. */
768 674 : memset(values, 0, sizeof(values));
769 674 : memset(nulls, false, sizeof(nulls));
770 :
771 674 : values[Anum_pg_publication_pubname - 1] =
772 674 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
773 674 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
774 :
775 674 : parse_publication_options(pstate,
776 : stmt->options,
777 : &publish_given, &pubactions,
778 : &publish_via_partition_root_given,
779 : &publish_via_partition_root);
780 :
781 656 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
782 : Anum_pg_publication_oid);
783 656 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
784 656 : values[Anum_pg_publication_puballtables - 1] =
785 656 : BoolGetDatum(stmt->for_all_tables);
786 656 : values[Anum_pg_publication_pubinsert - 1] =
787 656 : BoolGetDatum(pubactions.pubinsert);
788 656 : values[Anum_pg_publication_pubupdate - 1] =
789 656 : BoolGetDatum(pubactions.pubupdate);
790 656 : values[Anum_pg_publication_pubdelete - 1] =
791 656 : BoolGetDatum(pubactions.pubdelete);
792 656 : values[Anum_pg_publication_pubtruncate - 1] =
793 656 : BoolGetDatum(pubactions.pubtruncate);
794 656 : values[Anum_pg_publication_pubviaroot - 1] =
795 656 : BoolGetDatum(publish_via_partition_root);
796 :
797 656 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
798 :
799 : /* Insert tuple into catalog. */
800 656 : CatalogTupleInsert(rel, tup);
801 656 : heap_freetuple(tup);
802 :
803 656 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
804 :
805 656 : ObjectAddressSet(myself, PublicationRelationId, puboid);
806 :
807 : /* Make the changes visible. */
808 656 : CommandCounterIncrement();
809 :
810 : /* Associate objects with the publication. */
811 656 : if (stmt->for_all_tables)
812 : {
813 : /* Invalidate relcache so that publication info is rebuilt. */
814 62 : CacheInvalidateRelcacheAll();
815 : }
816 : else
817 : {
818 594 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
819 : &schemaidlist);
820 :
821 : /* FOR TABLES IN SCHEMA requires superuser */
822 576 : if (schemaidlist != NIL && !superuser())
823 6 : ereport(ERROR,
824 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
825 : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
826 :
827 570 : if (relations != NIL)
828 : {
829 : List *rels;
830 :
831 378 : rels = OpenTableList(relations);
832 354 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
833 : publish_via_partition_root);
834 :
835 330 : CheckPubRelationColumnList(stmt->pubname, rels,
836 : schemaidlist != NIL,
837 : publish_via_partition_root);
838 :
839 324 : PublicationAddTables(puboid, rels, true, NULL);
840 298 : CloseTableList(rels);
841 : }
842 :
843 490 : if (schemaidlist != NIL)
844 : {
845 : /*
846 : * Schema lock is held until the publication is created to prevent
847 : * concurrent schema deletion.
848 : */
849 134 : LockSchemaList(schemaidlist);
850 134 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
851 : }
852 : }
853 :
854 546 : table_close(rel, RowExclusiveLock);
855 :
856 546 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
857 :
858 546 : if (wal_level != WAL_LEVEL_LOGICAL)
859 308 : ereport(WARNING,
860 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
861 : errmsg("wal_level is insufficient to publish logical changes"),
862 : errhint("Set wal_level to \"logical\" before creating subscriptions.")));
863 :
864 546 : return myself;
865 : }
866 :
867 : /*
868 : * Change options of a publication.
869 : */
870 : static void
871 110 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
872 : Relation rel, HeapTuple tup)
873 : {
874 : bool nulls[Natts_pg_publication];
875 : bool replaces[Natts_pg_publication];
876 : Datum values[Natts_pg_publication];
877 : bool publish_given;
878 : PublicationActions pubactions;
879 : bool publish_via_partition_root_given;
880 : bool publish_via_partition_root;
881 : ObjectAddress obj;
882 : Form_pg_publication pubform;
883 110 : List *root_relids = NIL;
884 : ListCell *lc;
885 :
886 110 : parse_publication_options(pstate,
887 : stmt->options,
888 : &publish_given, &pubactions,
889 : &publish_via_partition_root_given,
890 : &publish_via_partition_root);
891 :
892 110 : pubform = (Form_pg_publication) GETSTRUCT(tup);
893 :
894 : /*
895 : * If the publication doesn't publish changes via the root partitioned
896 : * table, the partition's row filter and column list will be used. So
897 : * disallow using WHERE clause and column lists on partitioned table in
898 : * this case.
899 : */
900 110 : if (!pubform->puballtables && publish_via_partition_root_given &&
901 80 : !publish_via_partition_root)
902 : {
903 : /*
904 : * Lock the publication so nobody else can do anything with it. This
905 : * prevents concurrent alter to add partitioned table(s) with WHERE
906 : * clause(s) and/or column lists which we don't allow when not
907 : * publishing via root.
908 : */
909 48 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
910 : AccessShareLock);
911 :
912 48 : root_relids = GetPublicationRelations(pubform->oid,
913 : PUBLICATION_PART_ROOT);
914 :
915 84 : foreach(lc, root_relids)
916 : {
917 48 : Oid relid = lfirst_oid(lc);
918 : HeapTuple rftuple;
919 : char relkind;
920 : char *relname;
921 : bool has_rowfilter;
922 : bool has_collist;
923 :
924 : /*
925 : * Beware: we don't have lock on the relations, so cope silently
926 : * with the cache lookups returning NULL.
927 : */
928 :
929 48 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
930 : ObjectIdGetDatum(relid),
931 : ObjectIdGetDatum(pubform->oid));
932 48 : if (!HeapTupleIsValid(rftuple))
933 0 : continue;
934 48 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
935 48 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
936 48 : if (!has_rowfilter && !has_collist)
937 : {
938 12 : ReleaseSysCache(rftuple);
939 12 : continue;
940 : }
941 :
942 36 : relkind = get_rel_relkind(relid);
943 36 : if (relkind != RELKIND_PARTITIONED_TABLE)
944 : {
945 24 : ReleaseSysCache(rftuple);
946 24 : continue;
947 : }
948 12 : relname = get_rel_name(relid);
949 12 : if (relname == NULL) /* table concurrently dropped */
950 : {
951 0 : ReleaseSysCache(rftuple);
952 0 : continue;
953 : }
954 :
955 12 : if (has_rowfilter)
956 6 : ereport(ERROR,
957 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
958 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
959 : "publish_via_partition_root",
960 : stmt->pubname),
961 : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
962 : relname, "publish_via_partition_root")));
963 : Assert(has_collist);
964 6 : ereport(ERROR,
965 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
966 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
967 : "publish_via_partition_root",
968 : stmt->pubname),
969 : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
970 : relname, "publish_via_partition_root")));
971 : }
972 : }
973 :
974 : /* Everything ok, form a new tuple. */
975 98 : memset(values, 0, sizeof(values));
976 98 : memset(nulls, false, sizeof(nulls));
977 98 : memset(replaces, false, sizeof(replaces));
978 :
979 98 : if (publish_given)
980 : {
981 28 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
982 28 : replaces[Anum_pg_publication_pubinsert - 1] = true;
983 :
984 28 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
985 28 : replaces[Anum_pg_publication_pubupdate - 1] = true;
986 :
987 28 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
988 28 : replaces[Anum_pg_publication_pubdelete - 1] = true;
989 :
990 28 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
991 28 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
992 : }
993 :
994 98 : if (publish_via_partition_root_given)
995 : {
996 70 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
997 70 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
998 : }
999 :
1000 98 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1001 : replaces);
1002 :
1003 : /* Update the catalog. */
1004 98 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1005 :
1006 98 : CommandCounterIncrement();
1007 :
1008 98 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1009 :
1010 : /* Invalidate the relcache. */
1011 98 : if (pubform->puballtables)
1012 : {
1013 8 : CacheInvalidateRelcacheAll();
1014 : }
1015 : else
1016 : {
1017 90 : List *relids = NIL;
1018 90 : List *schemarelids = NIL;
1019 :
1020 : /*
1021 : * For any partitioned tables contained in the publication, we must
1022 : * invalidate all partitions contained in the respective partition
1023 : * trees, not just those explicitly mentioned in the publication.
1024 : */
1025 90 : if (root_relids == NIL)
1026 54 : relids = GetPublicationRelations(pubform->oid,
1027 : PUBLICATION_PART_ALL);
1028 : else
1029 : {
1030 : /*
1031 : * We already got tables explicitly mentioned in the publication.
1032 : * Now get all partitions for the partitioned table in the list.
1033 : */
1034 72 : foreach(lc, root_relids)
1035 36 : relids = GetPubPartitionOptionRelations(relids,
1036 : PUBLICATION_PART_ALL,
1037 : lfirst_oid(lc));
1038 : }
1039 :
1040 90 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1041 : PUBLICATION_PART_ALL);
1042 90 : relids = list_concat_unique_oid(relids, schemarelids);
1043 :
1044 90 : InvalidatePublicationRels(relids);
1045 : }
1046 :
1047 98 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1048 98 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1049 : (Node *) stmt);
1050 :
1051 98 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1052 98 : }
1053 :
1054 : /*
1055 : * Invalidate the relations.
1056 : */
1057 : void
1058 2178 : InvalidatePublicationRels(List *relids)
1059 : {
1060 : /*
1061 : * We don't want to send too many individual messages, at some point it's
1062 : * cheaper to just reset whole relcache.
1063 : */
1064 2178 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1065 : {
1066 : ListCell *lc;
1067 :
1068 21012 : foreach(lc, relids)
1069 18834 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1070 : }
1071 : else
1072 0 : CacheInvalidateRelcacheAll();
1073 2178 : }
1074 :
1075 : /*
1076 : * Add or remove table to/from publication.
1077 : */
1078 : static void
1079 848 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1080 : List *tables, const char *queryString,
1081 : bool publish_schema)
1082 : {
1083 848 : List *rels = NIL;
1084 848 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1085 848 : Oid pubid = pubform->oid;
1086 :
1087 : /*
1088 : * Nothing to do if no objects, except in SET: for that it is quite
1089 : * possible that user has not specified any tables in which case we need
1090 : * to remove all the existing tables.
1091 : */
1092 848 : if (!tables && stmt->action != AP_SetObjects)
1093 66 : return;
1094 :
1095 782 : rels = OpenTableList(tables);
1096 :
1097 782 : if (stmt->action == AP_AddObjects)
1098 : {
1099 268 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1100 :
1101 250 : publish_schema |= is_schema_publication(pubid);
1102 :
1103 250 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1104 250 : pubform->pubviaroot);
1105 :
1106 238 : PublicationAddTables(pubid, rels, false, stmt);
1107 : }
1108 514 : else if (stmt->action == AP_DropObjects)
1109 92 : PublicationDropTables(pubid, rels, false);
1110 : else /* AP_SetObjects */
1111 : {
1112 422 : List *oldrelids = GetPublicationRelations(pubid,
1113 : PUBLICATION_PART_ROOT);
1114 422 : List *delrels = NIL;
1115 : ListCell *oldlc;
1116 :
1117 422 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1118 :
1119 404 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1120 404 : pubform->pubviaroot);
1121 :
1122 : /*
1123 : * To recreate the relation list for the publication, look for
1124 : * existing relations that do not need to be dropped.
1125 : */
1126 770 : foreach(oldlc, oldrelids)
1127 : {
1128 378 : Oid oldrelid = lfirst_oid(oldlc);
1129 : ListCell *newlc;
1130 : PublicationRelInfo *oldrel;
1131 378 : bool found = false;
1132 : HeapTuple rftuple;
1133 378 : Node *oldrelwhereclause = NULL;
1134 378 : Bitmapset *oldcolumns = NULL;
1135 :
1136 : /* look up the cache for the old relmap */
1137 378 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1138 : ObjectIdGetDatum(oldrelid),
1139 : ObjectIdGetDatum(pubid));
1140 :
1141 : /*
1142 : * See if the existing relation currently has a WHERE clause or a
1143 : * column list. We need to compare those too.
1144 : */
1145 378 : if (HeapTupleIsValid(rftuple))
1146 : {
1147 378 : bool isnull = true;
1148 : Datum whereClauseDatum;
1149 : Datum columnListDatum;
1150 :
1151 : /* Load the WHERE clause for this table. */
1152 378 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1153 : Anum_pg_publication_rel_prqual,
1154 : &isnull);
1155 378 : if (!isnull)
1156 210 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1157 :
1158 : /* Transform the int2vector column list to a bitmap. */
1159 378 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1160 : Anum_pg_publication_rel_prattrs,
1161 : &isnull);
1162 :
1163 378 : if (!isnull)
1164 124 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1165 :
1166 378 : ReleaseSysCache(rftuple);
1167 : }
1168 :
1169 742 : foreach(newlc, rels)
1170 : {
1171 : PublicationRelInfo *newpubrel;
1172 : Oid newrelid;
1173 380 : Bitmapset *newcolumns = NULL;
1174 :
1175 380 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1176 380 : newrelid = RelationGetRelid(newpubrel->relation);
1177 :
1178 : /*
1179 : * If the new publication has column list, transform it to a
1180 : * bitmap too.
1181 : */
1182 380 : if (newpubrel->columns)
1183 : {
1184 : ListCell *lc;
1185 :
1186 354 : foreach(lc, newpubrel->columns)
1187 : {
1188 218 : char *colname = strVal(lfirst(lc));
1189 218 : AttrNumber attnum = get_attnum(newrelid, colname);
1190 :
1191 218 : newcolumns = bms_add_member(newcolumns, attnum);
1192 : }
1193 : }
1194 :
1195 : /*
1196 : * Check if any of the new set of relations matches with the
1197 : * existing relations in the publication. Additionally, if the
1198 : * relation has an associated WHERE clause, check the WHERE
1199 : * expressions also match. Same for the column list. Drop the
1200 : * rest.
1201 : */
1202 380 : if (RelationGetRelid(newpubrel->relation) == oldrelid)
1203 : {
1204 260 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1205 68 : bms_equal(oldcolumns, newcolumns))
1206 : {
1207 16 : found = true;
1208 16 : break;
1209 : }
1210 : }
1211 : }
1212 :
1213 : /*
1214 : * Add the non-matched relations to a list so that they can be
1215 : * dropped.
1216 : */
1217 378 : if (!found)
1218 : {
1219 362 : oldrel = palloc(sizeof(PublicationRelInfo));
1220 362 : oldrel->whereClause = NULL;
1221 362 : oldrel->columns = NIL;
1222 362 : oldrel->relation = table_open(oldrelid,
1223 : ShareUpdateExclusiveLock);
1224 362 : delrels = lappend(delrels, oldrel);
1225 : }
1226 : }
1227 :
1228 : /* And drop them. */
1229 392 : PublicationDropTables(pubid, delrels, true);
1230 :
1231 : /*
1232 : * Don't bother calculating the difference for adding, we'll catch and
1233 : * skip existing ones when doing catalog update.
1234 : */
1235 392 : PublicationAddTables(pubid, rels, true, stmt);
1236 :
1237 392 : CloseTableList(delrels);
1238 : }
1239 :
1240 668 : CloseTableList(rels);
1241 : }
1242 :
1243 : /*
1244 : * Alter the publication schemas.
1245 : *
1246 : * Add or remove schemas to/from publication.
1247 : */
1248 : static void
1249 734 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1250 : HeapTuple tup, List *schemaidlist)
1251 : {
1252 734 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1253 :
1254 : /*
1255 : * Nothing to do if no objects, except in SET: for that it is quite
1256 : * possible that user has not specified any schemas in which case we need
1257 : * to remove all the existing schemas.
1258 : */
1259 734 : if (!schemaidlist && stmt->action != AP_SetObjects)
1260 276 : return;
1261 :
1262 : /*
1263 : * Schema lock is held until the publication is altered to prevent
1264 : * concurrent schema deletion.
1265 : */
1266 458 : LockSchemaList(schemaidlist);
1267 458 : if (stmt->action == AP_AddObjects)
1268 : {
1269 : ListCell *lc;
1270 : List *reloids;
1271 :
1272 28 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1273 :
1274 38 : foreach(lc, reloids)
1275 : {
1276 : HeapTuple coltuple;
1277 :
1278 16 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1279 : ObjectIdGetDatum(lfirst_oid(lc)),
1280 : ObjectIdGetDatum(pubform->oid));
1281 :
1282 16 : if (!HeapTupleIsValid(coltuple))
1283 0 : continue;
1284 :
1285 : /*
1286 : * Disallow adding schema if column list is already part of the
1287 : * publication. See CheckPubRelationColumnList.
1288 : */
1289 16 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1290 6 : ereport(ERROR,
1291 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1292 : errmsg("cannot add schema to publication \"%s\"",
1293 : stmt->pubname),
1294 : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1295 :
1296 10 : ReleaseSysCache(coltuple);
1297 : }
1298 :
1299 22 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1300 : }
1301 430 : else if (stmt->action == AP_DropObjects)
1302 38 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1303 : else /* AP_SetObjects */
1304 : {
1305 392 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1306 392 : List *delschemas = NIL;
1307 :
1308 : /* Identify which schemas should be dropped */
1309 392 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1310 :
1311 : /*
1312 : * Schema lock is held until the publication is altered to prevent
1313 : * concurrent schema deletion.
1314 : */
1315 392 : LockSchemaList(delschemas);
1316 :
1317 : /* And drop them */
1318 392 : PublicationDropSchemas(pubform->oid, delschemas, true);
1319 :
1320 : /*
1321 : * Don't bother calculating the difference for adding, we'll catch and
1322 : * skip existing ones when doing catalog update.
1323 : */
1324 392 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1325 : }
1326 : }
1327 :
1328 : /*
1329 : * Check if relations and schemas can be in a given publication and throw
1330 : * appropriate error if not.
1331 : */
1332 : static void
1333 890 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1334 : List *tables, List *schemaidlist)
1335 : {
1336 890 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1337 :
1338 890 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1339 94 : schemaidlist && !superuser())
1340 6 : ereport(ERROR,
1341 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1342 : errmsg("must be superuser to add or set schemas")));
1343 :
1344 : /*
1345 : * Check that user is allowed to manipulate the publication tables in
1346 : * schema
1347 : */
1348 884 : if (schemaidlist && pubform->puballtables)
1349 18 : ereport(ERROR,
1350 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1351 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1352 : NameStr(pubform->pubname)),
1353 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1354 :
1355 : /* Check that user is allowed to manipulate the publication tables. */
1356 866 : if (tables && pubform->puballtables)
1357 18 : ereport(ERROR,
1358 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1359 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1360 : NameStr(pubform->pubname)),
1361 : errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1362 848 : }
1363 :
1364 : /*
1365 : * Alter the existing publication.
1366 : *
1367 : * This is dispatcher function for AlterPublicationOptions,
1368 : * AlterPublicationSchemas and AlterPublicationTables.
1369 : */
1370 : void
1371 1018 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1372 : {
1373 : Relation rel;
1374 : HeapTuple tup;
1375 : Form_pg_publication pubform;
1376 :
1377 1018 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1378 :
1379 1018 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1380 : CStringGetDatum(stmt->pubname));
1381 :
1382 1018 : if (!HeapTupleIsValid(tup))
1383 0 : ereport(ERROR,
1384 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1385 : errmsg("publication \"%s\" does not exist",
1386 : stmt->pubname)));
1387 :
1388 1018 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1389 :
1390 : /* must be owner */
1391 1018 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1392 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1393 0 : stmt->pubname);
1394 :
1395 1018 : if (stmt->options)
1396 110 : AlterPublicationOptions(pstate, stmt, rel, tup);
1397 : else
1398 : {
1399 908 : List *relations = NIL;
1400 908 : List *schemaidlist = NIL;
1401 908 : Oid pubid = pubform->oid;
1402 :
1403 908 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1404 : &schemaidlist);
1405 :
1406 890 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1407 :
1408 848 : heap_freetuple(tup);
1409 :
1410 : /* Lock the publication so nobody else can do anything with it. */
1411 848 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1412 : AccessExclusiveLock);
1413 :
1414 : /*
1415 : * It is possible that by the time we acquire the lock on publication,
1416 : * concurrent DDL has removed it. We can test this by checking the
1417 : * existence of publication. We get the tuple again to avoid the risk
1418 : * of any publication option getting changed.
1419 : */
1420 848 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1421 848 : if (!HeapTupleIsValid(tup))
1422 0 : ereport(ERROR,
1423 : errcode(ERRCODE_UNDEFINED_OBJECT),
1424 : errmsg("publication \"%s\" does not exist",
1425 : stmt->pubname));
1426 :
1427 848 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1428 : schemaidlist != NIL);
1429 734 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1430 : }
1431 :
1432 : /* Cleanup. */
1433 814 : heap_freetuple(tup);
1434 814 : table_close(rel, RowExclusiveLock);
1435 814 : }
1436 :
1437 : /*
1438 : * Remove relation from publication by mapping OID.
1439 : */
1440 : void
1441 760 : RemovePublicationRelById(Oid proid)
1442 : {
1443 : Relation rel;
1444 : HeapTuple tup;
1445 : Form_pg_publication_rel pubrel;
1446 760 : List *relids = NIL;
1447 :
1448 760 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1449 :
1450 760 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1451 :
1452 760 : if (!HeapTupleIsValid(tup))
1453 0 : elog(ERROR, "cache lookup failed for publication table %u",
1454 : proid);
1455 :
1456 760 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1457 :
1458 : /*
1459 : * Invalidate relcache so that publication info is rebuilt.
1460 : *
1461 : * For the partitioned tables, we must invalidate all partitions contained
1462 : * in the respective partition hierarchies, not just the one explicitly
1463 : * mentioned in the publication. This is required because we implicitly
1464 : * publish the child tables when the parent table is published.
1465 : */
1466 760 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1467 : pubrel->prrelid);
1468 :
1469 760 : InvalidatePublicationRels(relids);
1470 :
1471 760 : CatalogTupleDelete(rel, &tup->t_self);
1472 :
1473 760 : ReleaseSysCache(tup);
1474 :
1475 760 : table_close(rel, RowExclusiveLock);
1476 760 : }
1477 :
1478 : /*
1479 : * Remove the publication by mapping OID.
1480 : */
1481 : void
1482 370 : RemovePublicationById(Oid pubid)
1483 : {
1484 : Relation rel;
1485 : HeapTuple tup;
1486 : Form_pg_publication pubform;
1487 :
1488 370 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1489 :
1490 370 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1491 370 : if (!HeapTupleIsValid(tup))
1492 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1493 :
1494 370 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1495 :
1496 : /* Invalidate relcache so that publication info is rebuilt. */
1497 370 : if (pubform->puballtables)
1498 24 : CacheInvalidateRelcacheAll();
1499 :
1500 370 : CatalogTupleDelete(rel, &tup->t_self);
1501 :
1502 370 : ReleaseSysCache(tup);
1503 :
1504 370 : table_close(rel, RowExclusiveLock);
1505 370 : }
1506 :
1507 : /*
1508 : * Remove schema from publication by mapping OID.
1509 : */
1510 : void
1511 192 : RemovePublicationSchemaById(Oid psoid)
1512 : {
1513 : Relation rel;
1514 : HeapTuple tup;
1515 192 : List *schemaRels = NIL;
1516 : Form_pg_publication_namespace pubsch;
1517 :
1518 192 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1519 :
1520 192 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1521 :
1522 192 : if (!HeapTupleIsValid(tup))
1523 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1524 :
1525 192 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1526 :
1527 : /*
1528 : * Invalidate relcache so that publication info is rebuilt. See
1529 : * RemovePublicationRelById for why we need to consider all the
1530 : * partitions.
1531 : */
1532 192 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1533 : PUBLICATION_PART_ALL);
1534 192 : InvalidatePublicationRels(schemaRels);
1535 :
1536 192 : CatalogTupleDelete(rel, &tup->t_self);
1537 :
1538 192 : ReleaseSysCache(tup);
1539 :
1540 192 : table_close(rel, RowExclusiveLock);
1541 192 : }
1542 :
1543 : /*
1544 : * Open relations specified by a PublicationTable list.
1545 : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1546 : * add them to a publication.
1547 : */
1548 : static List *
1549 1160 : OpenTableList(List *tables)
1550 : {
1551 1160 : List *relids = NIL;
1552 1160 : List *rels = NIL;
1553 : ListCell *lc;
1554 1160 : List *relids_with_rf = NIL;
1555 1160 : List *relids_with_collist = NIL;
1556 :
1557 : /*
1558 : * Open, share-lock, and check all the explicitly-specified relations
1559 : */
1560 2380 : foreach(lc, tables)
1561 : {
1562 1244 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1563 1244 : bool recurse = t->relation->inh;
1564 : Relation rel;
1565 : Oid myrelid;
1566 : PublicationRelInfo *pub_rel;
1567 :
1568 : /* Allow query cancel in case this takes a long time */
1569 1244 : CHECK_FOR_INTERRUPTS();
1570 :
1571 1244 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1572 1244 : myrelid = RelationGetRelid(rel);
1573 :
1574 : /*
1575 : * Filter out duplicates if user specifies "foo, foo".
1576 : *
1577 : * Note that this algorithm is known to not be very efficient (O(N^2))
1578 : * but given that it only works on list of tables given to us by user
1579 : * it's deemed acceptable.
1580 : */
1581 1244 : if (list_member_oid(relids, myrelid))
1582 : {
1583 : /* Disallow duplicate tables if there are any with row filters. */
1584 24 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1585 12 : ereport(ERROR,
1586 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1587 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1588 : RelationGetRelationName(rel))));
1589 :
1590 : /* Disallow duplicate tables if there are any with column lists. */
1591 12 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1592 12 : ereport(ERROR,
1593 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1594 : errmsg("conflicting or redundant column lists for table \"%s\"",
1595 : RelationGetRelationName(rel))));
1596 :
1597 0 : table_close(rel, ShareUpdateExclusiveLock);
1598 0 : continue;
1599 : }
1600 :
1601 1220 : pub_rel = palloc(sizeof(PublicationRelInfo));
1602 1220 : pub_rel->relation = rel;
1603 1220 : pub_rel->whereClause = t->whereClause;
1604 1220 : pub_rel->columns = t->columns;
1605 1220 : rels = lappend(rels, pub_rel);
1606 1220 : relids = lappend_oid(relids, myrelid);
1607 :
1608 1220 : if (t->whereClause)
1609 372 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1610 :
1611 1220 : if (t->columns)
1612 332 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1613 :
1614 : /*
1615 : * Add children of this rel, if requested, so that they too are added
1616 : * to the publication. A partitioned table can't have any inheritance
1617 : * children other than its partitions, which need not be explicitly
1618 : * added to the publication.
1619 : */
1620 1220 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1621 : {
1622 : List *children;
1623 : ListCell *child;
1624 :
1625 1038 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1626 : NULL);
1627 :
1628 2084 : foreach(child, children)
1629 : {
1630 1046 : Oid childrelid = lfirst_oid(child);
1631 :
1632 : /* Allow query cancel in case this takes a long time */
1633 1046 : CHECK_FOR_INTERRUPTS();
1634 :
1635 : /*
1636 : * Skip duplicates if user specified both parent and child
1637 : * tables.
1638 : */
1639 1046 : if (list_member_oid(relids, childrelid))
1640 : {
1641 : /*
1642 : * We don't allow to specify row filter for both parent
1643 : * and child table at the same time as it is not very
1644 : * clear which one should be given preference.
1645 : */
1646 1038 : if (childrelid != myrelid &&
1647 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1648 0 : ereport(ERROR,
1649 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1650 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1651 : RelationGetRelationName(rel))));
1652 :
1653 : /*
1654 : * We don't allow to specify column list for both parent
1655 : * and child table at the same time as it is not very
1656 : * clear which one should be given preference.
1657 : */
1658 1038 : if (childrelid != myrelid &&
1659 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1660 0 : ereport(ERROR,
1661 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1662 : errmsg("conflicting or redundant column lists for table \"%s\"",
1663 : RelationGetRelationName(rel))));
1664 :
1665 1038 : continue;
1666 : }
1667 :
1668 : /* find_all_inheritors already got lock */
1669 8 : rel = table_open(childrelid, NoLock);
1670 8 : pub_rel = palloc(sizeof(PublicationRelInfo));
1671 8 : pub_rel->relation = rel;
1672 : /* child inherits WHERE clause from parent */
1673 8 : pub_rel->whereClause = t->whereClause;
1674 :
1675 : /* child inherits column list from parent */
1676 8 : pub_rel->columns = t->columns;
1677 8 : rels = lappend(rels, pub_rel);
1678 8 : relids = lappend_oid(relids, childrelid);
1679 :
1680 8 : if (t->whereClause)
1681 2 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1682 :
1683 8 : if (t->columns)
1684 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1685 : }
1686 : }
1687 : }
1688 :
1689 1136 : list_free(relids);
1690 1136 : list_free(relids_with_rf);
1691 :
1692 1136 : return rels;
1693 : }
1694 :
1695 : /*
1696 : * Close all relations in the list.
1697 : */
1698 : static void
1699 1358 : CloseTableList(List *rels)
1700 : {
1701 : ListCell *lc;
1702 :
1703 2754 : foreach(lc, rels)
1704 : {
1705 : PublicationRelInfo *pub_rel;
1706 :
1707 1396 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1708 1396 : table_close(pub_rel->relation, NoLock);
1709 : }
1710 :
1711 1358 : list_free_deep(rels);
1712 1358 : }
1713 :
1714 : /*
1715 : * Lock the schemas specified in the schema list in AccessShareLock mode in
1716 : * order to prevent concurrent schema deletion.
1717 : */
1718 : static void
1719 984 : LockSchemaList(List *schemalist)
1720 : {
1721 : ListCell *lc;
1722 :
1723 1268 : foreach(lc, schemalist)
1724 : {
1725 284 : Oid schemaid = lfirst_oid(lc);
1726 :
1727 : /* Allow query cancel in case this takes a long time */
1728 284 : CHECK_FOR_INTERRUPTS();
1729 284 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1730 :
1731 : /*
1732 : * It is possible that by the time we acquire the lock on schema,
1733 : * concurrent DDL has removed it. We can test this by checking the
1734 : * existence of schema.
1735 : */
1736 284 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1737 0 : ereport(ERROR,
1738 : errcode(ERRCODE_UNDEFINED_SCHEMA),
1739 : errmsg("schema with OID %u does not exist", schemaid));
1740 : }
1741 984 : }
1742 :
1743 : /*
1744 : * Add listed tables to the publication.
1745 : */
1746 : static void
1747 954 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1748 : AlterPublicationStmt *stmt)
1749 : {
1750 : ListCell *lc;
1751 :
1752 : Assert(!stmt || !stmt->for_all_tables);
1753 :
1754 1908 : foreach(lc, rels)
1755 : {
1756 1016 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1757 1016 : Relation rel = pub_rel->relation;
1758 : ObjectAddress obj;
1759 :
1760 : /* Must be owner of the table or superuser. */
1761 1016 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1762 6 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1763 6 : RelationGetRelationName(rel));
1764 :
1765 1010 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1766 954 : if (stmt)
1767 : {
1768 594 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1769 : (Node *) stmt);
1770 :
1771 594 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1772 : obj.objectId, 0);
1773 : }
1774 : }
1775 892 : }
1776 :
1777 : /*
1778 : * Remove listed tables from the publication.
1779 : */
1780 : static void
1781 484 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1782 : {
1783 : ObjectAddress obj;
1784 : ListCell *lc;
1785 : Oid prid;
1786 :
1787 926 : foreach(lc, rels)
1788 : {
1789 460 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1790 460 : Relation rel = pubrel->relation;
1791 460 : Oid relid = RelationGetRelid(rel);
1792 :
1793 460 : if (pubrel->columns)
1794 0 : ereport(ERROR,
1795 : errcode(ERRCODE_SYNTAX_ERROR),
1796 : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1797 :
1798 460 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1799 : ObjectIdGetDatum(relid),
1800 : ObjectIdGetDatum(pubid));
1801 460 : if (!OidIsValid(prid))
1802 : {
1803 12 : if (missing_ok)
1804 0 : continue;
1805 :
1806 12 : ereport(ERROR,
1807 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1808 : errmsg("relation \"%s\" is not part of the publication",
1809 : RelationGetRelationName(rel))));
1810 : }
1811 :
1812 448 : if (pubrel->whereClause)
1813 6 : ereport(ERROR,
1814 : (errcode(ERRCODE_SYNTAX_ERROR),
1815 : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1816 :
1817 442 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1818 442 : performDeletion(&obj, DROP_CASCADE, 0);
1819 : }
1820 466 : }
1821 :
1822 : /*
1823 : * Add listed schemas to the publication.
1824 : */
1825 : static void
1826 548 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1827 : AlterPublicationStmt *stmt)
1828 : {
1829 : ListCell *lc;
1830 :
1831 : Assert(!stmt || !stmt->for_all_tables);
1832 :
1833 758 : foreach(lc, schemas)
1834 : {
1835 222 : Oid schemaid = lfirst_oid(lc);
1836 : ObjectAddress obj;
1837 :
1838 222 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
1839 210 : if (stmt)
1840 : {
1841 58 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1842 : (Node *) stmt);
1843 :
1844 58 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1845 : obj.objectId, 0);
1846 : }
1847 : }
1848 536 : }
1849 :
1850 : /*
1851 : * Remove listed schemas from the publication.
1852 : */
1853 : static void
1854 430 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1855 : {
1856 : ObjectAddress obj;
1857 : ListCell *lc;
1858 : Oid psid;
1859 :
1860 480 : foreach(lc, schemas)
1861 : {
1862 56 : Oid schemaid = lfirst_oid(lc);
1863 :
1864 56 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1865 : Anum_pg_publication_namespace_oid,
1866 : ObjectIdGetDatum(schemaid),
1867 : ObjectIdGetDatum(pubid));
1868 56 : if (!OidIsValid(psid))
1869 : {
1870 6 : if (missing_ok)
1871 0 : continue;
1872 :
1873 6 : ereport(ERROR,
1874 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1875 : errmsg("tables from schema \"%s\" are not part of the publication",
1876 : get_namespace_name(schemaid))));
1877 : }
1878 :
1879 50 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1880 50 : performDeletion(&obj, DROP_CASCADE, 0);
1881 : }
1882 424 : }
1883 :
1884 : /*
1885 : * Internal workhorse for changing a publication owner
1886 : */
1887 : static void
1888 26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1889 : {
1890 : Form_pg_publication form;
1891 :
1892 26 : form = (Form_pg_publication) GETSTRUCT(tup);
1893 :
1894 26 : if (form->pubowner == newOwnerId)
1895 2 : return;
1896 :
1897 24 : if (!superuser())
1898 : {
1899 : AclResult aclresult;
1900 :
1901 : /* Must be owner */
1902 12 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1903 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1904 0 : NameStr(form->pubname));
1905 :
1906 : /* Must be able to become new owner */
1907 12 : check_can_set_role(GetUserId(), newOwnerId);
1908 :
1909 : /* New owner must have CREATE privilege on database */
1910 12 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
1911 12 : if (aclresult != ACLCHECK_OK)
1912 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
1913 0 : get_database_name(MyDatabaseId));
1914 :
1915 12 : if (form->puballtables && !superuser_arg(newOwnerId))
1916 0 : ereport(ERROR,
1917 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1918 : errmsg("permission denied to change owner of publication \"%s\"",
1919 : NameStr(form->pubname)),
1920 : errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1921 :
1922 12 : if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1923 6 : ereport(ERROR,
1924 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1925 : errmsg("permission denied to change owner of publication \"%s\"",
1926 : NameStr(form->pubname)),
1927 : errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1928 : }
1929 :
1930 18 : form->pubowner = newOwnerId;
1931 18 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1932 :
1933 : /* Update owner dependency reference */
1934 18 : changeDependencyOnOwner(PublicationRelationId,
1935 : form->oid,
1936 : newOwnerId);
1937 :
1938 18 : InvokeObjectPostAlterHook(PublicationRelationId,
1939 : form->oid, 0);
1940 : }
1941 :
1942 : /*
1943 : * Change publication owner -- by name
1944 : */
1945 : ObjectAddress
1946 26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
1947 : {
1948 : Oid subid;
1949 : HeapTuple tup;
1950 : Relation rel;
1951 : ObjectAddress address;
1952 : Form_pg_publication pubform;
1953 :
1954 26 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1955 :
1956 26 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
1957 :
1958 26 : if (!HeapTupleIsValid(tup))
1959 0 : ereport(ERROR,
1960 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1961 : errmsg("publication \"%s\" does not exist", name)));
1962 :
1963 26 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1964 26 : subid = pubform->oid;
1965 :
1966 26 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
1967 :
1968 20 : ObjectAddressSet(address, PublicationRelationId, subid);
1969 :
1970 20 : heap_freetuple(tup);
1971 :
1972 20 : table_close(rel, RowExclusiveLock);
1973 :
1974 20 : return address;
1975 : }
1976 :
1977 : /*
1978 : * Change publication owner -- by OID
1979 : */
1980 : void
1981 0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
1982 : {
1983 : HeapTuple tup;
1984 : Relation rel;
1985 :
1986 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1987 :
1988 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
1989 :
1990 0 : if (!HeapTupleIsValid(tup))
1991 0 : ereport(ERROR,
1992 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1993 : errmsg("publication with OID %u does not exist", subid)));
1994 :
1995 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
1996 :
1997 0 : heap_freetuple(tup);
1998 :
1999 0 : table_close(rel, RowExclusiveLock);
2000 0 : }
|