Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/namespace.h"
23 : #include "catalog/objectaccess.h"
24 : #include "catalog/objectaddress.h"
25 : #include "catalog/pg_database.h"
26 : #include "catalog/pg_inherits.h"
27 : #include "catalog/pg_namespace.h"
28 : #include "catalog/pg_proc.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "commands/dbcommands.h"
33 : #include "commands/defrem.h"
34 : #include "commands/event_trigger.h"
35 : #include "commands/publicationcmds.h"
36 : #include "miscadmin.h"
37 : #include "nodes/nodeFuncs.h"
38 : #include "parser/parse_clause.h"
39 : #include "parser/parse_collate.h"
40 : #include "parser/parse_relation.h"
41 : #include "storage/lmgr.h"
42 : #include "utils/acl.h"
43 : #include "utils/builtins.h"
44 : #include "utils/inval.h"
45 : #include "utils/lsyscache.h"
46 : #include "utils/rel.h"
47 : #include "utils/syscache.h"
48 : #include "utils/varlena.h"
49 :
50 :
51 : /*
52 : * Information used to validate the columns in the row filter expression. See
53 : * contain_invalid_rfcolumn_walker for details.
54 : */
55 : typedef struct rf_context
56 : {
57 : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : bool pubviaroot; /* true if we are validating the parent
59 : * relation's row filter */
60 : Oid relid; /* relid of the relation */
61 : Oid parentid; /* relid of the parent relation */
62 : } rf_context;
63 :
64 : static List *OpenTableList(List *tables);
65 : static void CloseTableList(List *rels);
66 : static void LockSchemaList(List *schemalist);
67 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : AlterPublicationStmt *stmt);
69 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : AlterPublicationStmt *stmt);
72 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 :
74 :
75 : static void
76 834 : parse_publication_options(ParseState *pstate,
77 : List *options,
78 : bool *publish_given,
79 : PublicationActions *pubactions,
80 : bool *publish_via_partition_root_given,
81 : bool *publish_via_partition_root,
82 : bool *publish_generated_columns_given,
83 : bool *publish_generated_columns)
84 : {
85 : ListCell *lc;
86 :
87 834 : *publish_given = false;
88 834 : *publish_via_partition_root_given = false;
89 834 : *publish_generated_columns_given = false;
90 :
91 : /* defaults */
92 834 : pubactions->pubinsert = true;
93 834 : pubactions->pubupdate = true;
94 834 : pubactions->pubdelete = true;
95 834 : pubactions->pubtruncate = true;
96 834 : *publish_via_partition_root = false;
97 834 : *publish_generated_columns = false;
98 :
99 : /* Parse options */
100 1152 : foreach(lc, options)
101 : {
102 348 : DefElem *defel = (DefElem *) lfirst(lc);
103 :
104 348 : if (strcmp(defel->defname, "publish") == 0)
105 : {
106 : char *publish;
107 : List *publish_list;
108 : ListCell *lc2;
109 :
110 122 : if (*publish_given)
111 0 : errorConflictingDefElem(defel, pstate);
112 :
113 : /*
114 : * If publish option was given only the explicitly listed actions
115 : * should be published.
116 : */
117 122 : pubactions->pubinsert = false;
118 122 : pubactions->pubupdate = false;
119 122 : pubactions->pubdelete = false;
120 122 : pubactions->pubtruncate = false;
121 :
122 122 : *publish_given = true;
123 122 : publish = defGetString(defel);
124 :
125 122 : if (!SplitIdentifierString(publish, ',', &publish_list))
126 0 : ereport(ERROR,
127 : (errcode(ERRCODE_SYNTAX_ERROR),
128 : errmsg("invalid list syntax in parameter \"%s\"",
129 : "publish")));
130 :
131 : /* Process the option list. */
132 270 : foreach(lc2, publish_list)
133 : {
134 154 : char *publish_opt = (char *) lfirst(lc2);
135 :
136 154 : if (strcmp(publish_opt, "insert") == 0)
137 108 : pubactions->pubinsert = true;
138 46 : else if (strcmp(publish_opt, "update") == 0)
139 20 : pubactions->pubupdate = true;
140 26 : else if (strcmp(publish_opt, "delete") == 0)
141 10 : pubactions->pubdelete = true;
142 16 : else if (strcmp(publish_opt, "truncate") == 0)
143 10 : pubactions->pubtruncate = true;
144 : else
145 6 : ereport(ERROR,
146 : (errcode(ERRCODE_SYNTAX_ERROR),
147 : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
148 : "publish", publish_opt)));
149 : }
150 : }
151 226 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
152 : {
153 162 : if (*publish_via_partition_root_given)
154 6 : errorConflictingDefElem(defel, pstate);
155 156 : *publish_via_partition_root_given = true;
156 156 : *publish_via_partition_root = defGetBoolean(defel);
157 : }
158 64 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
159 : {
160 58 : if (*publish_generated_columns_given)
161 6 : errorConflictingDefElem(defel, pstate);
162 52 : *publish_generated_columns_given = true;
163 52 : *publish_generated_columns = defGetBoolean(defel);
164 : }
165 : else
166 6 : ereport(ERROR,
167 : (errcode(ERRCODE_SYNTAX_ERROR),
168 : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
169 : }
170 804 : }
171 :
172 : /*
173 : * Convert the PublicationObjSpecType list into schema oid list and
174 : * PublicationTable list.
175 : */
176 : static void
177 1558 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
178 : List **rels, List **schemas)
179 : {
180 : ListCell *cell;
181 : PublicationObjSpec *pubobj;
182 :
183 1558 : if (!pubobjspec_list)
184 86 : return;
185 :
186 3108 : foreach(cell, pubobjspec_list)
187 : {
188 : Oid schemaid;
189 : List *search_path;
190 :
191 1672 : pubobj = (PublicationObjSpec *) lfirst(cell);
192 :
193 1672 : switch (pubobj->pubobjtype)
194 : {
195 1316 : case PUBLICATIONOBJ_TABLE:
196 1316 : *rels = lappend(*rels, pubobj->pubtable);
197 1316 : break;
198 332 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
199 332 : schemaid = get_namespace_oid(pubobj->name, false);
200 :
201 : /* Filter out duplicates if user specifies "sch1, sch1" */
202 302 : *schemas = list_append_unique_oid(*schemas, schemaid);
203 302 : break;
204 24 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
205 24 : search_path = fetch_search_path(false);
206 24 : if (search_path == NIL) /* nothing valid in search_path? */
207 6 : ereport(ERROR,
208 : errcode(ERRCODE_UNDEFINED_SCHEMA),
209 : errmsg("no schema has been selected for CURRENT_SCHEMA"));
210 :
211 18 : schemaid = linitial_oid(search_path);
212 18 : list_free(search_path);
213 :
214 : /* Filter out duplicates if user specifies "sch1, sch1" */
215 18 : *schemas = list_append_unique_oid(*schemas, schemaid);
216 18 : break;
217 0 : default:
218 : /* shouldn't happen */
219 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
220 : break;
221 : }
222 : }
223 : }
224 :
225 : /*
226 : * Returns true if any of the columns used in the row filter WHERE expression is
227 : * not part of REPLICA IDENTITY, false otherwise.
228 : */
229 : static bool
230 258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
231 : {
232 258 : if (node == NULL)
233 0 : return false;
234 :
235 258 : if (IsA(node, Var))
236 : {
237 104 : Var *var = (Var *) node;
238 104 : AttrNumber attnum = var->varattno;
239 :
240 : /*
241 : * If pubviaroot is true, we are validating the row filter of the
242 : * parent table, but the bitmap contains the replica identity
243 : * information of the child table. So, get the column number of the
244 : * child table as parent and child column order could be different.
245 : */
246 104 : if (context->pubviaroot)
247 : {
248 16 : char *colname = get_attname(context->parentid, attnum, false);
249 :
250 16 : attnum = get_attnum(context->relid, colname);
251 : }
252 :
253 104 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
254 104 : context->bms_replident))
255 60 : return true;
256 : }
257 :
258 198 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
259 : (void *) context);
260 : }
261 :
262 : /*
263 : * Check if all columns referenced in the filter expression are part of the
264 : * REPLICA IDENTITY index or not.
265 : *
266 : * Returns true if any invalid column is found.
267 : */
268 : bool
269 682 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
270 : bool pubviaroot)
271 : {
272 : HeapTuple rftuple;
273 682 : Oid relid = RelationGetRelid(relation);
274 682 : Oid publish_as_relid = RelationGetRelid(relation);
275 682 : bool result = false;
276 : Datum rfdatum;
277 : bool rfisnull;
278 :
279 : /*
280 : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
281 : * allowed in the row filter and we can skip the validation.
282 : */
283 682 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
284 164 : return false;
285 :
286 : /*
287 : * For a partition, if pubviaroot is true, find the topmost ancestor that
288 : * is published via this publication as we need to use its row filter
289 : * expression to filter the partition's changes.
290 : *
291 : * Note that even though the row filter used is for an ancestor, the
292 : * REPLICA IDENTITY used will be for the actual child table.
293 : */
294 518 : if (pubviaroot && relation->rd_rel->relispartition)
295 : {
296 : publish_as_relid
297 108 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
298 :
299 108 : if (!OidIsValid(publish_as_relid))
300 6 : publish_as_relid = relid;
301 : }
302 :
303 518 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
304 : ObjectIdGetDatum(publish_as_relid),
305 : ObjectIdGetDatum(pubid));
306 :
307 518 : if (!HeapTupleIsValid(rftuple))
308 56 : return false;
309 :
310 462 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
311 : Anum_pg_publication_rel_prqual,
312 : &rfisnull);
313 :
314 462 : if (!rfisnull)
315 : {
316 102 : rf_context context = {0};
317 : Node *rfnode;
318 102 : Bitmapset *bms = NULL;
319 :
320 102 : context.pubviaroot = pubviaroot;
321 102 : context.parentid = publish_as_relid;
322 102 : context.relid = relid;
323 :
324 : /* Remember columns that are part of the REPLICA IDENTITY */
325 102 : bms = RelationGetIndexAttrBitmap(relation,
326 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
327 :
328 102 : context.bms_replident = bms;
329 102 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
330 102 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
331 : }
332 :
333 462 : ReleaseSysCache(rftuple);
334 :
335 462 : return result;
336 : }
337 :
338 : /*
339 : * Check if all columns referenced in the REPLICA IDENTITY are covered by
340 : * the column list.
341 : *
342 : * Returns true if any replica identity column is not covered by column list.
343 : */
344 : bool
345 682 : pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
346 : bool pubviaroot)
347 : {
348 : HeapTuple tuple;
349 682 : Oid relid = RelationGetRelid(relation);
350 682 : Oid publish_as_relid = RelationGetRelid(relation);
351 682 : bool result = false;
352 : Datum datum;
353 : bool isnull;
354 :
355 : /*
356 : * For a partition, if pubviaroot is true, find the topmost ancestor that
357 : * is published via this publication as we need to use its column list for
358 : * the changes.
359 : *
360 : * Note that even though the column list used is for an ancestor, the
361 : * REPLICA IDENTITY used will be for the actual child table.
362 : */
363 682 : if (pubviaroot && relation->rd_rel->relispartition)
364 : {
365 130 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
366 :
367 130 : if (!OidIsValid(publish_as_relid))
368 6 : publish_as_relid = relid;
369 : }
370 :
371 682 : tuple = SearchSysCache2(PUBLICATIONRELMAP,
372 : ObjectIdGetDatum(publish_as_relid),
373 : ObjectIdGetDatum(pubid));
374 :
375 682 : if (!HeapTupleIsValid(tuple))
376 64 : return false;
377 :
378 618 : datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
379 : Anum_pg_publication_rel_prattrs,
380 : &isnull);
381 :
382 618 : if (!isnull)
383 : {
384 : int x;
385 : Bitmapset *idattrs;
386 230 : Bitmapset *columns = NULL;
387 :
388 : /* With REPLICA IDENTITY FULL, no column list is allowed. */
389 230 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
390 36 : result = true;
391 :
392 : /* Transform the column list datum to a bitmapset. */
393 230 : columns = pub_collist_to_bitmapset(NULL, datum, NULL);
394 :
395 : /* Remember columns that are part of the REPLICA IDENTITY */
396 230 : idattrs = RelationGetIndexAttrBitmap(relation,
397 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
398 :
399 : /*
400 : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
401 : * offset (to handle system columns the usual way), while column list
402 : * does not use offset, so we can't do bms_is_subset(). Instead, we
403 : * have to loop over the idattrs and check all of them are in the
404 : * list.
405 : */
406 230 : x = -1;
407 350 : while ((x = bms_next_member(idattrs, x)) >= 0)
408 : {
409 192 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
410 :
411 : /*
412 : * If pubviaroot is true, we are validating the column list of the
413 : * parent table, but the bitmap contains the replica identity
414 : * information of the child table. The parent/child attnums may
415 : * not match, so translate them to the parent - get the attname
416 : * from the child, and look it up in the parent.
417 : */
418 192 : if (pubviaroot)
419 : {
420 : /* attribute name in the child table */
421 80 : char *colname = get_attname(relid, attnum, false);
422 :
423 : /*
424 : * Determine the attnum for the attribute name in parent (we
425 : * are using the column list defined on the parent).
426 : */
427 80 : attnum = get_attnum(publish_as_relid, colname);
428 : }
429 :
430 : /* replica identity column, not covered by the column list */
431 192 : if (!bms_is_member(attnum, columns))
432 : {
433 72 : result = true;
434 72 : break;
435 : }
436 : }
437 :
438 230 : bms_free(idattrs);
439 230 : bms_free(columns);
440 : }
441 :
442 618 : ReleaseSysCache(tuple);
443 :
444 618 : return result;
445 : }
446 :
447 : /* check_functions_in_node callback */
448 : static bool
449 396 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
450 : {
451 396 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
452 : func_id >= FirstNormalObjectId);
453 : }
454 :
455 : /*
456 : * The row filter walker checks if the row filter expression is a "simple
457 : * expression".
458 : *
459 : * It allows only simple or compound expressions such as:
460 : * - (Var Op Const)
461 : * - (Var Op Var)
462 : * - (Var Op Const) AND/OR (Var Op Const)
463 : * - etc
464 : * (where Var is a column of the table this filter belongs to)
465 : *
466 : * The simple expression has the following restrictions:
467 : * - User-defined operators are not allowed;
468 : * - User-defined functions are not allowed;
469 : * - User-defined types are not allowed;
470 : * - User-defined collations are not allowed;
471 : * - Non-immutable built-in functions are not allowed;
472 : * - System columns are not allowed.
473 : *
474 : * NOTES
475 : *
476 : * We don't allow user-defined functions/operators/types/collations because
477 : * (a) if a user drops a user-defined object used in a row filter expression or
478 : * if there is any other error while using it, the logical decoding
479 : * infrastructure won't be able to recover from such an error even if the
480 : * object is recreated again because a historic snapshot is used to evaluate
481 : * the row filter;
482 : * (b) a user-defined function can be used to access tables that could have
483 : * unpleasant results because a historic snapshot is used. That's why only
484 : * immutable built-in functions are allowed in row filter expressions.
485 : *
486 : * We don't allow system columns because currently, we don't have that
487 : * information in the tuple passed to downstream. Also, as we don't replicate
488 : * those to subscribers, there doesn't seem to be a need for a filter on those
489 : * columns.
490 : *
491 : * We can allow other node types after more analysis and testing.
492 : */
493 : static bool
494 1328 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
495 : {
496 1328 : char *errdetail_msg = NULL;
497 :
498 1328 : if (node == NULL)
499 6 : return false;
500 :
501 1322 : switch (nodeTag(node))
502 : {
503 356 : case T_Var:
504 : /* System columns are not allowed. */
505 356 : if (((Var *) node)->varattno < InvalidAttrNumber)
506 6 : errdetail_msg = _("System columns are not allowed.");
507 356 : break;
508 352 : case T_OpExpr:
509 : case T_DistinctExpr:
510 : case T_NullIfExpr:
511 : /* OK, except user-defined operators are not allowed. */
512 352 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
513 6 : errdetail_msg = _("User-defined operators are not allowed.");
514 352 : break;
515 6 : case T_ScalarArrayOpExpr:
516 : /* OK, except user-defined operators are not allowed. */
517 6 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
518 0 : errdetail_msg = _("User-defined operators are not allowed.");
519 :
520 : /*
521 : * We don't need to check the hashfuncid and negfuncid of
522 : * ScalarArrayOpExpr as those functions are only built for a
523 : * subquery.
524 : */
525 6 : break;
526 6 : case T_RowCompareExpr:
527 : {
528 : ListCell *opid;
529 :
530 : /* OK, except user-defined operators are not allowed. */
531 18 : foreach(opid, ((RowCompareExpr *) node)->opnos)
532 : {
533 12 : if (lfirst_oid(opid) >= FirstNormalObjectId)
534 : {
535 0 : errdetail_msg = _("User-defined operators are not allowed.");
536 0 : break;
537 : }
538 : }
539 : }
540 6 : break;
541 596 : case T_Const:
542 : case T_FuncExpr:
543 : case T_BoolExpr:
544 : case T_RelabelType:
545 : case T_CollateExpr:
546 : case T_CaseExpr:
547 : case T_CaseTestExpr:
548 : case T_ArrayExpr:
549 : case T_RowExpr:
550 : case T_CoalesceExpr:
551 : case T_MinMaxExpr:
552 : case T_XmlExpr:
553 : case T_NullTest:
554 : case T_BooleanTest:
555 : case T_List:
556 : /* OK, supported */
557 596 : break;
558 6 : default:
559 6 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
560 6 : break;
561 : }
562 :
563 : /*
564 : * For all the supported nodes, if we haven't already found a problem,
565 : * check the types, functions, and collations used in it. We check List
566 : * by walking through each element.
567 : */
568 1322 : if (!errdetail_msg && !IsA(node, List))
569 : {
570 1250 : if (exprType(node) >= FirstNormalObjectId)
571 6 : errdetail_msg = _("User-defined types are not allowed.");
572 1244 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
573 : (void *) pstate))
574 12 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
575 2464 : else if (exprCollation(node) >= FirstNormalObjectId ||
576 1232 : exprInputCollation(node) >= FirstNormalObjectId)
577 6 : errdetail_msg = _("User-defined collations are not allowed.");
578 : }
579 :
580 : /*
581 : * If we found a problem in this node, throw error now. Otherwise keep
582 : * going.
583 : */
584 1322 : if (errdetail_msg)
585 42 : ereport(ERROR,
586 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
587 : errmsg("invalid publication WHERE expression"),
588 : errdetail_internal("%s", errdetail_msg),
589 : parser_errposition(pstate, exprLocation(node))));
590 :
591 1280 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
592 : (void *) pstate);
593 : }
594 :
595 : /*
596 : * Check if the row filter expression is a "simple expression".
597 : *
598 : * See check_simple_rowfilter_expr_walker for details.
599 : */
600 : static bool
601 344 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
602 : {
603 344 : return check_simple_rowfilter_expr_walker(node, pstate);
604 : }
605 :
606 : /*
607 : * Transform the publication WHERE expression for all the relations in the list,
608 : * ensuring it is coerced to boolean and necessary collation information is
609 : * added if required, and add a new nsitem/RTE for the associated relation to
610 : * the ParseState's namespace list.
611 : *
612 : * Also check the publication row filter expression and throw an error if
613 : * anything not permitted or unexpected is encountered.
614 : */
615 : static void
616 1092 : TransformPubWhereClauses(List *tables, const char *queryString,
617 : bool pubviaroot)
618 : {
619 : ListCell *lc;
620 :
621 2186 : foreach(lc, tables)
622 : {
623 : ParseNamespaceItem *nsitem;
624 1154 : Node *whereclause = NULL;
625 : ParseState *pstate;
626 1154 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
627 :
628 1154 : if (pri->whereClause == NULL)
629 792 : continue;
630 :
631 : /*
632 : * If the publication doesn't publish changes via the root partitioned
633 : * table, the partition's row filter will be used. So disallow using
634 : * WHERE clause on partitioned table in this case.
635 : */
636 362 : if (!pubviaroot &&
637 340 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
638 6 : ereport(ERROR,
639 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
640 : errmsg("cannot use publication WHERE clause for relation \"%s\"",
641 : RelationGetRelationName(pri->relation)),
642 : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
643 : "publish_via_partition_root")));
644 :
645 : /*
646 : * A fresh pstate is required so that we only have "this" table in its
647 : * rangetable
648 : */
649 356 : pstate = make_parsestate(NULL);
650 356 : pstate->p_sourcetext = queryString;
651 356 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
652 : AccessShareLock, NULL,
653 : false, false);
654 356 : addNSItemToQuery(pstate, nsitem, false, true, true);
655 :
656 356 : whereclause = transformWhereClause(pstate,
657 356 : copyObject(pri->whereClause),
658 : EXPR_KIND_WHERE,
659 : "PUBLICATION WHERE");
660 :
661 : /* Fix up collation information */
662 344 : assign_expr_collations(pstate, whereclause);
663 :
664 : /*
665 : * We allow only simple expressions in row filters. See
666 : * check_simple_rowfilter_expr_walker.
667 : */
668 344 : check_simple_rowfilter_expr(whereclause, pstate);
669 :
670 302 : free_parsestate(pstate);
671 :
672 302 : pri->whereClause = whereclause;
673 : }
674 1032 : }
675 :
676 :
677 : /*
678 : * Given a list of tables that are going to be added to a publication,
679 : * verify that they fulfill the necessary preconditions, namely: no tables
680 : * have a column list if any schema is published; and partitioned tables do
681 : * not have column lists if publish_via_partition_root is not set.
682 : *
683 : * 'publish_schema' indicates that the publication contains any TABLES IN
684 : * SCHEMA elements (newly added in this command, or preexisting).
685 : * 'pubviaroot' is the value of publish_via_partition_root.
686 : */
687 : static void
688 1032 : CheckPubRelationColumnList(char *pubname, List *tables,
689 : bool publish_schema, bool pubviaroot)
690 : {
691 : ListCell *lc;
692 :
693 2096 : foreach(lc, tables)
694 : {
695 1094 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
696 :
697 1094 : if (pri->columns == NIL)
698 722 : continue;
699 :
700 : /*
701 : * Disallow specifying column list if any schema is in the
702 : * publication.
703 : *
704 : * XXX We could instead just forbid the case when the publication
705 : * tries to publish the table with a column list and a schema for that
706 : * table. However, if we do that then we need a restriction during
707 : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
708 : * seem to be a good idea.
709 : */
710 372 : if (publish_schema)
711 24 : ereport(ERROR,
712 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
713 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
714 : get_namespace_name(RelationGetNamespace(pri->relation)),
715 : RelationGetRelationName(pri->relation), pubname),
716 : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
717 :
718 : /*
719 : * If the publication doesn't publish changes via the root partitioned
720 : * table, the partition's column list will be used. So disallow using
721 : * a column list on the partitioned table in this case.
722 : */
723 348 : if (!pubviaroot &&
724 272 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
725 6 : ereport(ERROR,
726 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
727 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
728 : get_namespace_name(RelationGetNamespace(pri->relation)),
729 : RelationGetRelationName(pri->relation), pubname),
730 : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
731 : "publish_via_partition_root")));
732 : }
733 1002 : }
734 :
735 : /*
736 : * Create new publication.
737 : */
738 : ObjectAddress
739 730 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
740 : {
741 : Relation rel;
742 : ObjectAddress myself;
743 : Oid puboid;
744 : bool nulls[Natts_pg_publication];
745 : Datum values[Natts_pg_publication];
746 : HeapTuple tup;
747 : bool publish_given;
748 : PublicationActions pubactions;
749 : bool publish_via_partition_root_given;
750 : bool publish_via_partition_root;
751 : bool publish_generated_columns_given;
752 : bool publish_generated_columns;
753 : AclResult aclresult;
754 730 : List *relations = NIL;
755 730 : List *schemaidlist = NIL;
756 :
757 : /* must have CREATE privilege on database */
758 730 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
759 730 : if (aclresult != ACLCHECK_OK)
760 6 : aclcheck_error(aclresult, OBJECT_DATABASE,
761 6 : get_database_name(MyDatabaseId));
762 :
763 : /* FOR ALL TABLES requires superuser */
764 724 : if (stmt->for_all_tables && !superuser())
765 0 : ereport(ERROR,
766 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
767 : errmsg("must be superuser to create FOR ALL TABLES publication")));
768 :
769 724 : rel = table_open(PublicationRelationId, RowExclusiveLock);
770 :
771 : /* Check if name is used */
772 724 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
773 : CStringGetDatum(stmt->pubname));
774 724 : if (OidIsValid(puboid))
775 6 : ereport(ERROR,
776 : (errcode(ERRCODE_DUPLICATE_OBJECT),
777 : errmsg("publication \"%s\" already exists",
778 : stmt->pubname)));
779 :
780 : /* Form a tuple. */
781 718 : memset(values, 0, sizeof(values));
782 718 : memset(nulls, false, sizeof(nulls));
783 :
784 718 : values[Anum_pg_publication_pubname - 1] =
785 718 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
786 718 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
787 :
788 718 : parse_publication_options(pstate,
789 : stmt->options,
790 : &publish_given, &pubactions,
791 : &publish_via_partition_root_given,
792 : &publish_via_partition_root,
793 : &publish_generated_columns_given,
794 : &publish_generated_columns);
795 :
796 688 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
797 : Anum_pg_publication_oid);
798 688 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
799 688 : values[Anum_pg_publication_puballtables - 1] =
800 688 : BoolGetDatum(stmt->for_all_tables);
801 688 : values[Anum_pg_publication_pubinsert - 1] =
802 688 : BoolGetDatum(pubactions.pubinsert);
803 688 : values[Anum_pg_publication_pubupdate - 1] =
804 688 : BoolGetDatum(pubactions.pubupdate);
805 688 : values[Anum_pg_publication_pubdelete - 1] =
806 688 : BoolGetDatum(pubactions.pubdelete);
807 688 : values[Anum_pg_publication_pubtruncate - 1] =
808 688 : BoolGetDatum(pubactions.pubtruncate);
809 688 : values[Anum_pg_publication_pubviaroot - 1] =
810 688 : BoolGetDatum(publish_via_partition_root);
811 688 : values[Anum_pg_publication_pubgencols - 1] =
812 688 : BoolGetDatum(publish_generated_columns);
813 :
814 688 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
815 :
816 : /* Insert tuple into catalog. */
817 688 : CatalogTupleInsert(rel, tup);
818 688 : heap_freetuple(tup);
819 :
820 688 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
821 :
822 688 : ObjectAddressSet(myself, PublicationRelationId, puboid);
823 :
824 : /* Make the changes visible. */
825 688 : CommandCounterIncrement();
826 :
827 : /* Associate objects with the publication. */
828 688 : if (stmt->for_all_tables)
829 : {
830 : /* Invalidate relcache so that publication info is rebuilt. */
831 74 : CacheInvalidateRelcacheAll();
832 : }
833 : else
834 : {
835 614 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
836 : &schemaidlist);
837 :
838 : /* FOR TABLES IN SCHEMA requires superuser */
839 596 : if (schemaidlist != NIL && !superuser())
840 6 : ereport(ERROR,
841 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
842 : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
843 :
844 590 : if (relations != NIL)
845 : {
846 : List *rels;
847 :
848 396 : rels = OpenTableList(relations);
849 372 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
850 : publish_via_partition_root);
851 :
852 348 : CheckPubRelationColumnList(stmt->pubname, rels,
853 : schemaidlist != NIL,
854 : publish_via_partition_root);
855 :
856 342 : PublicationAddTables(puboid, rels, true, NULL);
857 316 : CloseTableList(rels);
858 : }
859 :
860 510 : if (schemaidlist != NIL)
861 : {
862 : /*
863 : * Schema lock is held until the publication is created to prevent
864 : * concurrent schema deletion.
865 : */
866 134 : LockSchemaList(schemaidlist);
867 134 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
868 : }
869 : }
870 :
871 578 : table_close(rel, RowExclusiveLock);
872 :
873 578 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
874 :
875 578 : if (wal_level != WAL_LEVEL_LOGICAL)
876 332 : ereport(WARNING,
877 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
878 : errmsg("\"wal_level\" is insufficient to publish logical changes"),
879 : errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
880 :
881 578 : return myself;
882 : }
883 :
884 : /*
885 : * Change options of a publication.
886 : */
887 : static void
888 116 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
889 : Relation rel, HeapTuple tup)
890 : {
891 : bool nulls[Natts_pg_publication];
892 : bool replaces[Natts_pg_publication];
893 : Datum values[Natts_pg_publication];
894 : bool publish_given;
895 : PublicationActions pubactions;
896 : bool publish_via_partition_root_given;
897 : bool publish_via_partition_root;
898 : bool publish_generated_columns_given;
899 : bool publish_generated_columns;
900 : ObjectAddress obj;
901 : Form_pg_publication pubform;
902 116 : List *root_relids = NIL;
903 : ListCell *lc;
904 :
905 116 : parse_publication_options(pstate,
906 : stmt->options,
907 : &publish_given, &pubactions,
908 : &publish_via_partition_root_given,
909 : &publish_via_partition_root,
910 : &publish_generated_columns_given,
911 : &publish_generated_columns);
912 :
913 116 : pubform = (Form_pg_publication) GETSTRUCT(tup);
914 :
915 : /*
916 : * If the publication doesn't publish changes via the root partitioned
917 : * table, the partition's row filter and column list will be used. So
918 : * disallow using WHERE clause and column lists on partitioned table in
919 : * this case.
920 : */
921 116 : if (!pubform->puballtables && publish_via_partition_root_given &&
922 80 : !publish_via_partition_root)
923 : {
924 : /*
925 : * Lock the publication so nobody else can do anything with it. This
926 : * prevents concurrent alter to add partitioned table(s) with WHERE
927 : * clause(s) and/or column lists which we don't allow when not
928 : * publishing via root.
929 : */
930 48 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
931 : AccessShareLock);
932 :
933 48 : root_relids = GetPublicationRelations(pubform->oid,
934 : PUBLICATION_PART_ROOT);
935 :
936 84 : foreach(lc, root_relids)
937 : {
938 48 : Oid relid = lfirst_oid(lc);
939 : HeapTuple rftuple;
940 : char relkind;
941 : char *relname;
942 : bool has_rowfilter;
943 : bool has_collist;
944 :
945 : /*
946 : * Beware: we don't have lock on the relations, so cope silently
947 : * with the cache lookups returning NULL.
948 : */
949 :
950 48 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
951 : ObjectIdGetDatum(relid),
952 : ObjectIdGetDatum(pubform->oid));
953 48 : if (!HeapTupleIsValid(rftuple))
954 0 : continue;
955 48 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
956 48 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
957 48 : if (!has_rowfilter && !has_collist)
958 : {
959 12 : ReleaseSysCache(rftuple);
960 12 : continue;
961 : }
962 :
963 36 : relkind = get_rel_relkind(relid);
964 36 : if (relkind != RELKIND_PARTITIONED_TABLE)
965 : {
966 24 : ReleaseSysCache(rftuple);
967 24 : continue;
968 : }
969 12 : relname = get_rel_name(relid);
970 12 : if (relname == NULL) /* table concurrently dropped */
971 : {
972 0 : ReleaseSysCache(rftuple);
973 0 : continue;
974 : }
975 :
976 12 : if (has_rowfilter)
977 6 : ereport(ERROR,
978 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
979 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
980 : "publish_via_partition_root",
981 : stmt->pubname),
982 : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
983 : relname, "publish_via_partition_root")));
984 : Assert(has_collist);
985 6 : ereport(ERROR,
986 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
987 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
988 : "publish_via_partition_root",
989 : stmt->pubname),
990 : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
991 : relname, "publish_via_partition_root")));
992 : }
993 : }
994 :
995 : /* Everything ok, form a new tuple. */
996 104 : memset(values, 0, sizeof(values));
997 104 : memset(nulls, false, sizeof(nulls));
998 104 : memset(replaces, false, sizeof(replaces));
999 :
1000 104 : if (publish_given)
1001 : {
1002 28 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1003 28 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1004 :
1005 28 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1006 28 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1007 :
1008 28 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1009 28 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1010 :
1011 28 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1012 28 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1013 : }
1014 :
1015 104 : if (publish_via_partition_root_given)
1016 : {
1017 70 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1018 70 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1019 : }
1020 :
1021 104 : if (publish_generated_columns_given)
1022 : {
1023 6 : values[Anum_pg_publication_pubgencols - 1] = BoolGetDatum(publish_generated_columns);
1024 6 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1025 : }
1026 :
1027 104 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1028 : replaces);
1029 :
1030 : /* Update the catalog. */
1031 104 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1032 :
1033 104 : CommandCounterIncrement();
1034 :
1035 104 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1036 :
1037 : /* Invalidate the relcache. */
1038 104 : if (pubform->puballtables)
1039 : {
1040 8 : CacheInvalidateRelcacheAll();
1041 : }
1042 : else
1043 : {
1044 96 : List *relids = NIL;
1045 96 : List *schemarelids = NIL;
1046 :
1047 : /*
1048 : * For any partitioned tables contained in the publication, we must
1049 : * invalidate all partitions contained in the respective partition
1050 : * trees, not just those explicitly mentioned in the publication.
1051 : */
1052 96 : if (root_relids == NIL)
1053 60 : relids = GetPublicationRelations(pubform->oid,
1054 : PUBLICATION_PART_ALL);
1055 : else
1056 : {
1057 : /*
1058 : * We already got tables explicitly mentioned in the publication.
1059 : * Now get all partitions for the partitioned table in the list.
1060 : */
1061 72 : foreach(lc, root_relids)
1062 36 : relids = GetPubPartitionOptionRelations(relids,
1063 : PUBLICATION_PART_ALL,
1064 : lfirst_oid(lc));
1065 : }
1066 :
1067 96 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1068 : PUBLICATION_PART_ALL);
1069 96 : relids = list_concat_unique_oid(relids, schemarelids);
1070 :
1071 96 : InvalidatePublicationRels(relids);
1072 : }
1073 :
1074 104 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1075 104 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1076 : (Node *) stmt);
1077 :
1078 104 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1079 104 : }
1080 :
1081 : /*
1082 : * Invalidate the relations.
1083 : */
1084 : void
1085 2256 : InvalidatePublicationRels(List *relids)
1086 : {
1087 : /*
1088 : * We don't want to send too many individual messages, at some point it's
1089 : * cheaper to just reset whole relcache.
1090 : */
1091 2256 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1092 : {
1093 : ListCell *lc;
1094 :
1095 19028 : foreach(lc, relids)
1096 16772 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1097 : }
1098 : else
1099 0 : CacheInvalidateRelcacheAll();
1100 2256 : }
1101 :
1102 : /*
1103 : * Add or remove table to/from publication.
1104 : */
1105 : static void
1106 884 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1107 : List *tables, const char *queryString,
1108 : bool publish_schema)
1109 : {
1110 884 : List *rels = NIL;
1111 884 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1112 884 : Oid pubid = pubform->oid;
1113 :
1114 : /*
1115 : * Nothing to do if no objects, except in SET: for that it is quite
1116 : * possible that user has not specified any tables in which case we need
1117 : * to remove all the existing tables.
1118 : */
1119 884 : if (!tables && stmt->action != AP_SetObjects)
1120 66 : return;
1121 :
1122 818 : rels = OpenTableList(tables);
1123 :
1124 818 : if (stmt->action == AP_AddObjects)
1125 : {
1126 274 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1127 :
1128 256 : publish_schema |= is_schema_publication(pubid);
1129 :
1130 256 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1131 256 : pubform->pubviaroot);
1132 :
1133 244 : PublicationAddTables(pubid, rels, false, stmt);
1134 : }
1135 544 : else if (stmt->action == AP_DropObjects)
1136 98 : PublicationDropTables(pubid, rels, false);
1137 : else /* AP_SetObjects */
1138 : {
1139 446 : List *oldrelids = GetPublicationRelations(pubid,
1140 : PUBLICATION_PART_ROOT);
1141 446 : List *delrels = NIL;
1142 : ListCell *oldlc;
1143 :
1144 446 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1145 :
1146 428 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1147 428 : pubform->pubviaroot);
1148 :
1149 : /*
1150 : * To recreate the relation list for the publication, look for
1151 : * existing relations that do not need to be dropped.
1152 : */
1153 806 : foreach(oldlc, oldrelids)
1154 : {
1155 402 : Oid oldrelid = lfirst_oid(oldlc);
1156 : ListCell *newlc;
1157 : PublicationRelInfo *oldrel;
1158 402 : bool found = false;
1159 : HeapTuple rftuple;
1160 402 : Node *oldrelwhereclause = NULL;
1161 402 : Bitmapset *oldcolumns = NULL;
1162 :
1163 : /* look up the cache for the old relmap */
1164 402 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1165 : ObjectIdGetDatum(oldrelid),
1166 : ObjectIdGetDatum(pubid));
1167 :
1168 : /*
1169 : * See if the existing relation currently has a WHERE clause or a
1170 : * column list. We need to compare those too.
1171 : */
1172 402 : if (HeapTupleIsValid(rftuple))
1173 : {
1174 402 : bool isnull = true;
1175 : Datum whereClauseDatum;
1176 : Datum columnListDatum;
1177 :
1178 : /* Load the WHERE clause for this table. */
1179 402 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1180 : Anum_pg_publication_rel_prqual,
1181 : &isnull);
1182 402 : if (!isnull)
1183 210 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1184 :
1185 : /* Transform the int2vector column list to a bitmap. */
1186 402 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1187 : Anum_pg_publication_rel_prattrs,
1188 : &isnull);
1189 :
1190 402 : if (!isnull)
1191 136 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1192 :
1193 402 : ReleaseSysCache(rftuple);
1194 : }
1195 :
1196 778 : foreach(newlc, rels)
1197 : {
1198 : PublicationRelInfo *newpubrel;
1199 : Oid newrelid;
1200 404 : Bitmapset *newcolumns = NULL;
1201 :
1202 404 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1203 404 : newrelid = RelationGetRelid(newpubrel->relation);
1204 :
1205 : /*
1206 : * Validate the column list. If the column list or WHERE
1207 : * clause changes, then the validation done here will be
1208 : * duplicated inside PublicationAddTables(). The validation
1209 : * is cheap enough that that seems harmless.
1210 : */
1211 404 : newcolumns = pub_collist_validate(newpubrel->relation,
1212 : newpubrel->columns);
1213 :
1214 : /*
1215 : * Check if any of the new set of relations matches with the
1216 : * existing relations in the publication. Additionally, if the
1217 : * relation has an associated WHERE clause, check the WHERE
1218 : * expressions also match. Same for the column list. Drop the
1219 : * rest.
1220 : */
1221 392 : if (newrelid == oldrelid)
1222 : {
1223 284 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1224 80 : bms_equal(oldcolumns, newcolumns))
1225 : {
1226 16 : found = true;
1227 16 : break;
1228 : }
1229 : }
1230 : }
1231 :
1232 : /*
1233 : * Add the non-matched relations to a list so that they can be
1234 : * dropped.
1235 : */
1236 390 : if (!found)
1237 : {
1238 374 : oldrel = palloc(sizeof(PublicationRelInfo));
1239 374 : oldrel->whereClause = NULL;
1240 374 : oldrel->columns = NIL;
1241 374 : oldrel->relation = table_open(oldrelid,
1242 : ShareUpdateExclusiveLock);
1243 374 : delrels = lappend(delrels, oldrel);
1244 : }
1245 : }
1246 :
1247 : /* And drop them. */
1248 404 : PublicationDropTables(pubid, delrels, true);
1249 :
1250 : /*
1251 : * Don't bother calculating the difference for adding, we'll catch and
1252 : * skip existing ones when doing catalog update.
1253 : */
1254 404 : PublicationAddTables(pubid, rels, true, stmt);
1255 :
1256 404 : CloseTableList(delrels);
1257 : }
1258 :
1259 692 : CloseTableList(rels);
1260 : }
1261 :
1262 : /*
1263 : * Alter the publication schemas.
1264 : *
1265 : * Add or remove schemas to/from publication.
1266 : */
1267 : static void
1268 758 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1269 : HeapTuple tup, List *schemaidlist)
1270 : {
1271 758 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1272 :
1273 : /*
1274 : * Nothing to do if no objects, except in SET: for that it is quite
1275 : * possible that user has not specified any schemas in which case we need
1276 : * to remove all the existing schemas.
1277 : */
1278 758 : if (!schemaidlist && stmt->action != AP_SetObjects)
1279 288 : return;
1280 :
1281 : /*
1282 : * Schema lock is held until the publication is altered to prevent
1283 : * concurrent schema deletion.
1284 : */
1285 470 : LockSchemaList(schemaidlist);
1286 470 : if (stmt->action == AP_AddObjects)
1287 : {
1288 : ListCell *lc;
1289 : List *reloids;
1290 :
1291 28 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1292 :
1293 36 : foreach(lc, reloids)
1294 : {
1295 : HeapTuple coltuple;
1296 :
1297 14 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1298 : ObjectIdGetDatum(lfirst_oid(lc)),
1299 : ObjectIdGetDatum(pubform->oid));
1300 :
1301 14 : if (!HeapTupleIsValid(coltuple))
1302 0 : continue;
1303 :
1304 : /*
1305 : * Disallow adding schema if column list is already part of the
1306 : * publication. See CheckPubRelationColumnList.
1307 : */
1308 14 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1309 6 : ereport(ERROR,
1310 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1311 : errmsg("cannot add schema to publication \"%s\"",
1312 : stmt->pubname),
1313 : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1314 :
1315 8 : ReleaseSysCache(coltuple);
1316 : }
1317 :
1318 22 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1319 : }
1320 442 : else if (stmt->action == AP_DropObjects)
1321 38 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1322 : else /* AP_SetObjects */
1323 : {
1324 404 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1325 404 : List *delschemas = NIL;
1326 :
1327 : /* Identify which schemas should be dropped */
1328 404 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1329 :
1330 : /*
1331 : * Schema lock is held until the publication is altered to prevent
1332 : * concurrent schema deletion.
1333 : */
1334 404 : LockSchemaList(delschemas);
1335 :
1336 : /* And drop them */
1337 404 : PublicationDropSchemas(pubform->oid, delschemas, true);
1338 :
1339 : /*
1340 : * Don't bother calculating the difference for adding, we'll catch and
1341 : * skip existing ones when doing catalog update.
1342 : */
1343 404 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1344 : }
1345 : }
1346 :
1347 : /*
1348 : * Check if relations and schemas can be in a given publication and throw
1349 : * appropriate error if not.
1350 : */
1351 : static void
1352 926 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1353 : List *tables, List *schemaidlist)
1354 : {
1355 926 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1356 :
1357 926 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1358 94 : schemaidlist && !superuser())
1359 6 : ereport(ERROR,
1360 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1361 : errmsg("must be superuser to add or set schemas")));
1362 :
1363 : /*
1364 : * Check that user is allowed to manipulate the publication tables in
1365 : * schema
1366 : */
1367 920 : if (schemaidlist && pubform->puballtables)
1368 18 : ereport(ERROR,
1369 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1370 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1371 : NameStr(pubform->pubname)),
1372 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1373 :
1374 : /* Check that user is allowed to manipulate the publication tables. */
1375 902 : if (tables && pubform->puballtables)
1376 18 : ereport(ERROR,
1377 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1378 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1379 : NameStr(pubform->pubname)),
1380 : errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1381 884 : }
1382 :
1383 : /*
1384 : * Alter the existing publication.
1385 : *
1386 : * This is dispatcher function for AlterPublicationOptions,
1387 : * AlterPublicationSchemas and AlterPublicationTables.
1388 : */
1389 : void
1390 1060 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1391 : {
1392 : Relation rel;
1393 : HeapTuple tup;
1394 : Form_pg_publication pubform;
1395 :
1396 1060 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1397 :
1398 1060 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1399 : CStringGetDatum(stmt->pubname));
1400 :
1401 1060 : if (!HeapTupleIsValid(tup))
1402 0 : ereport(ERROR,
1403 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1404 : errmsg("publication \"%s\" does not exist",
1405 : stmt->pubname)));
1406 :
1407 1060 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1408 :
1409 : /* must be owner */
1410 1060 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1411 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1412 0 : stmt->pubname);
1413 :
1414 1060 : if (stmt->options)
1415 116 : AlterPublicationOptions(pstate, stmt, rel, tup);
1416 : else
1417 : {
1418 944 : List *relations = NIL;
1419 944 : List *schemaidlist = NIL;
1420 944 : Oid pubid = pubform->oid;
1421 :
1422 944 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1423 : &schemaidlist);
1424 :
1425 926 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1426 :
1427 884 : heap_freetuple(tup);
1428 :
1429 : /* Lock the publication so nobody else can do anything with it. */
1430 884 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1431 : AccessExclusiveLock);
1432 :
1433 : /*
1434 : * It is possible that by the time we acquire the lock on publication,
1435 : * concurrent DDL has removed it. We can test this by checking the
1436 : * existence of publication. We get the tuple again to avoid the risk
1437 : * of any publication option getting changed.
1438 : */
1439 884 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1440 884 : if (!HeapTupleIsValid(tup))
1441 0 : ereport(ERROR,
1442 : errcode(ERRCODE_UNDEFINED_OBJECT),
1443 : errmsg("publication \"%s\" does not exist",
1444 : stmt->pubname));
1445 :
1446 884 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1447 : schemaidlist != NIL);
1448 758 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1449 : }
1450 :
1451 : /* Cleanup. */
1452 844 : heap_freetuple(tup);
1453 844 : table_close(rel, RowExclusiveLock);
1454 844 : }
1455 :
1456 : /*
1457 : * Remove relation from publication by mapping OID.
1458 : */
1459 : void
1460 796 : RemovePublicationRelById(Oid proid)
1461 : {
1462 : Relation rel;
1463 : HeapTuple tup;
1464 : Form_pg_publication_rel pubrel;
1465 796 : List *relids = NIL;
1466 :
1467 796 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1468 :
1469 796 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1470 :
1471 796 : if (!HeapTupleIsValid(tup))
1472 0 : elog(ERROR, "cache lookup failed for publication table %u",
1473 : proid);
1474 :
1475 796 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1476 :
1477 : /*
1478 : * Invalidate relcache so that publication info is rebuilt.
1479 : *
1480 : * For the partitioned tables, we must invalidate all partitions contained
1481 : * in the respective partition hierarchies, not just the one explicitly
1482 : * mentioned in the publication. This is required because we implicitly
1483 : * publish the child tables when the parent table is published.
1484 : */
1485 796 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1486 : pubrel->prrelid);
1487 :
1488 796 : InvalidatePublicationRels(relids);
1489 :
1490 796 : CatalogTupleDelete(rel, &tup->t_self);
1491 :
1492 796 : ReleaseSysCache(tup);
1493 :
1494 796 : table_close(rel, RowExclusiveLock);
1495 796 : }
1496 :
1497 : /*
1498 : * Remove the publication by mapping OID.
1499 : */
1500 : void
1501 402 : RemovePublicationById(Oid pubid)
1502 : {
1503 : Relation rel;
1504 : HeapTuple tup;
1505 : Form_pg_publication pubform;
1506 :
1507 402 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1508 :
1509 402 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1510 402 : if (!HeapTupleIsValid(tup))
1511 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1512 :
1513 402 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1514 :
1515 : /* Invalidate relcache so that publication info is rebuilt. */
1516 402 : if (pubform->puballtables)
1517 38 : CacheInvalidateRelcacheAll();
1518 :
1519 402 : CatalogTupleDelete(rel, &tup->t_self);
1520 :
1521 402 : ReleaseSysCache(tup);
1522 :
1523 402 : table_close(rel, RowExclusiveLock);
1524 402 : }
1525 :
1526 : /*
1527 : * Remove schema from publication by mapping OID.
1528 : */
1529 : void
1530 192 : RemovePublicationSchemaById(Oid psoid)
1531 : {
1532 : Relation rel;
1533 : HeapTuple tup;
1534 192 : List *schemaRels = NIL;
1535 : Form_pg_publication_namespace pubsch;
1536 :
1537 192 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1538 :
1539 192 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1540 :
1541 192 : if (!HeapTupleIsValid(tup))
1542 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1543 :
1544 192 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1545 :
1546 : /*
1547 : * Invalidate relcache so that publication info is rebuilt. See
1548 : * RemovePublicationRelById for why we need to consider all the
1549 : * partitions.
1550 : */
1551 192 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1552 : PUBLICATION_PART_ALL);
1553 192 : InvalidatePublicationRels(schemaRels);
1554 :
1555 192 : CatalogTupleDelete(rel, &tup->t_self);
1556 :
1557 192 : ReleaseSysCache(tup);
1558 :
1559 192 : table_close(rel, RowExclusiveLock);
1560 192 : }
1561 :
1562 : /*
1563 : * Open relations specified by a PublicationTable list.
1564 : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1565 : * add them to a publication.
1566 : */
1567 : static List *
1568 1214 : OpenTableList(List *tables)
1569 : {
1570 1214 : List *relids = NIL;
1571 1214 : List *rels = NIL;
1572 : ListCell *lc;
1573 1214 : List *relids_with_rf = NIL;
1574 1214 : List *relids_with_collist = NIL;
1575 :
1576 : /*
1577 : * Open, share-lock, and check all the explicitly-specified relations
1578 : */
1579 2488 : foreach(lc, tables)
1580 : {
1581 1298 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1582 1298 : bool recurse = t->relation->inh;
1583 : Relation rel;
1584 : Oid myrelid;
1585 : PublicationRelInfo *pub_rel;
1586 :
1587 : /* Allow query cancel in case this takes a long time */
1588 1298 : CHECK_FOR_INTERRUPTS();
1589 :
1590 1298 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1591 1298 : myrelid = RelationGetRelid(rel);
1592 :
1593 : /*
1594 : * Filter out duplicates if user specifies "foo, foo".
1595 : *
1596 : * Note that this algorithm is known to not be very efficient (O(N^2))
1597 : * but given that it only works on list of tables given to us by user
1598 : * it's deemed acceptable.
1599 : */
1600 1298 : if (list_member_oid(relids, myrelid))
1601 : {
1602 : /* Disallow duplicate tables if there are any with row filters. */
1603 24 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1604 12 : ereport(ERROR,
1605 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1606 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1607 : RelationGetRelationName(rel))));
1608 :
1609 : /* Disallow duplicate tables if there are any with column lists. */
1610 12 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1611 12 : ereport(ERROR,
1612 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1613 : errmsg("conflicting or redundant column lists for table \"%s\"",
1614 : RelationGetRelationName(rel))));
1615 :
1616 0 : table_close(rel, ShareUpdateExclusiveLock);
1617 0 : continue;
1618 : }
1619 :
1620 1274 : pub_rel = palloc(sizeof(PublicationRelInfo));
1621 1274 : pub_rel->relation = rel;
1622 1274 : pub_rel->whereClause = t->whereClause;
1623 1274 : pub_rel->columns = t->columns;
1624 1274 : rels = lappend(rels, pub_rel);
1625 1274 : relids = lappend_oid(relids, myrelid);
1626 :
1627 1274 : if (t->whereClause)
1628 372 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1629 :
1630 1274 : if (t->columns)
1631 378 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1632 :
1633 : /*
1634 : * Add children of this rel, if requested, so that they too are added
1635 : * to the publication. A partitioned table can't have any inheritance
1636 : * children other than its partitions, which need not be explicitly
1637 : * added to the publication.
1638 : */
1639 1274 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1640 : {
1641 : List *children;
1642 : ListCell *child;
1643 :
1644 1092 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1645 : NULL);
1646 :
1647 2192 : foreach(child, children)
1648 : {
1649 1100 : Oid childrelid = lfirst_oid(child);
1650 :
1651 : /* Allow query cancel in case this takes a long time */
1652 1100 : CHECK_FOR_INTERRUPTS();
1653 :
1654 : /*
1655 : * Skip duplicates if user specified both parent and child
1656 : * tables.
1657 : */
1658 1100 : if (list_member_oid(relids, childrelid))
1659 : {
1660 : /*
1661 : * We don't allow to specify row filter for both parent
1662 : * and child table at the same time as it is not very
1663 : * clear which one should be given preference.
1664 : */
1665 1092 : if (childrelid != myrelid &&
1666 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1667 0 : ereport(ERROR,
1668 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1669 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1670 : RelationGetRelationName(rel))));
1671 :
1672 : /*
1673 : * We don't allow to specify column list for both parent
1674 : * and child table at the same time as it is not very
1675 : * clear which one should be given preference.
1676 : */
1677 1092 : if (childrelid != myrelid &&
1678 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1679 0 : ereport(ERROR,
1680 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1681 : errmsg("conflicting or redundant column lists for table \"%s\"",
1682 : RelationGetRelationName(rel))));
1683 :
1684 1092 : continue;
1685 : }
1686 :
1687 : /* find_all_inheritors already got lock */
1688 8 : rel = table_open(childrelid, NoLock);
1689 8 : pub_rel = palloc(sizeof(PublicationRelInfo));
1690 8 : pub_rel->relation = rel;
1691 : /* child inherits WHERE clause from parent */
1692 8 : pub_rel->whereClause = t->whereClause;
1693 :
1694 : /* child inherits column list from parent */
1695 8 : pub_rel->columns = t->columns;
1696 8 : rels = lappend(rels, pub_rel);
1697 8 : relids = lappend_oid(relids, childrelid);
1698 :
1699 8 : if (t->whereClause)
1700 2 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1701 :
1702 8 : if (t->columns)
1703 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1704 : }
1705 : }
1706 : }
1707 :
1708 1190 : list_free(relids);
1709 1190 : list_free(relids_with_rf);
1710 :
1711 1190 : return rels;
1712 : }
1713 :
1714 : /*
1715 : * Close all relations in the list.
1716 : */
1717 : static void
1718 1412 : CloseTableList(List *rels)
1719 : {
1720 : ListCell *lc;
1721 :
1722 2862 : foreach(lc, rels)
1723 : {
1724 : PublicationRelInfo *pub_rel;
1725 :
1726 1450 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1727 1450 : table_close(pub_rel->relation, NoLock);
1728 : }
1729 :
1730 1412 : list_free_deep(rels);
1731 1412 : }
1732 :
1733 : /*
1734 : * Lock the schemas specified in the schema list in AccessShareLock mode in
1735 : * order to prevent concurrent schema deletion.
1736 : */
1737 : static void
1738 1008 : LockSchemaList(List *schemalist)
1739 : {
1740 : ListCell *lc;
1741 :
1742 1292 : foreach(lc, schemalist)
1743 : {
1744 284 : Oid schemaid = lfirst_oid(lc);
1745 :
1746 : /* Allow query cancel in case this takes a long time */
1747 284 : CHECK_FOR_INTERRUPTS();
1748 284 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1749 :
1750 : /*
1751 : * It is possible that by the time we acquire the lock on schema,
1752 : * concurrent DDL has removed it. We can test this by checking the
1753 : * existence of schema.
1754 : */
1755 284 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1756 0 : ereport(ERROR,
1757 : errcode(ERRCODE_UNDEFINED_SCHEMA),
1758 : errmsg("schema with OID %u does not exist", schemaid));
1759 : }
1760 1008 : }
1761 :
1762 : /*
1763 : * Add listed tables to the publication.
1764 : */
1765 : static void
1766 990 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1767 : AlterPublicationStmt *stmt)
1768 : {
1769 : ListCell *lc;
1770 :
1771 : Assert(!stmt || !stmt->for_all_tables);
1772 :
1773 1980 : foreach(lc, rels)
1774 : {
1775 1052 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1776 1052 : Relation rel = pub_rel->relation;
1777 : ObjectAddress obj;
1778 :
1779 : /* Must be owner of the table or superuser. */
1780 1052 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1781 6 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1782 6 : RelationGetRelationName(rel));
1783 :
1784 1046 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1785 990 : if (stmt)
1786 : {
1787 612 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1788 : (Node *) stmt);
1789 :
1790 612 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1791 : obj.objectId, 0);
1792 : }
1793 : }
1794 928 : }
1795 :
1796 : /*
1797 : * Remove listed tables from the publication.
1798 : */
1799 : static void
1800 502 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1801 : {
1802 : ObjectAddress obj;
1803 : ListCell *lc;
1804 : Oid prid;
1805 :
1806 962 : foreach(lc, rels)
1807 : {
1808 478 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1809 478 : Relation rel = pubrel->relation;
1810 478 : Oid relid = RelationGetRelid(rel);
1811 :
1812 478 : if (pubrel->columns)
1813 0 : ereport(ERROR,
1814 : errcode(ERRCODE_SYNTAX_ERROR),
1815 : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1816 :
1817 478 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1818 : ObjectIdGetDatum(relid),
1819 : ObjectIdGetDatum(pubid));
1820 478 : if (!OidIsValid(prid))
1821 : {
1822 12 : if (missing_ok)
1823 0 : continue;
1824 :
1825 12 : ereport(ERROR,
1826 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1827 : errmsg("relation \"%s\" is not part of the publication",
1828 : RelationGetRelationName(rel))));
1829 : }
1830 :
1831 466 : if (pubrel->whereClause)
1832 6 : ereport(ERROR,
1833 : (errcode(ERRCODE_SYNTAX_ERROR),
1834 : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1835 :
1836 460 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1837 460 : performDeletion(&obj, DROP_CASCADE, 0);
1838 : }
1839 484 : }
1840 :
1841 : /*
1842 : * Add listed schemas to the publication.
1843 : */
1844 : static void
1845 560 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1846 : AlterPublicationStmt *stmt)
1847 : {
1848 : ListCell *lc;
1849 :
1850 : Assert(!stmt || !stmt->for_all_tables);
1851 :
1852 770 : foreach(lc, schemas)
1853 : {
1854 222 : Oid schemaid = lfirst_oid(lc);
1855 : ObjectAddress obj;
1856 :
1857 222 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
1858 210 : if (stmt)
1859 : {
1860 58 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1861 : (Node *) stmt);
1862 :
1863 58 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1864 : obj.objectId, 0);
1865 : }
1866 : }
1867 548 : }
1868 :
1869 : /*
1870 : * Remove listed schemas from the publication.
1871 : */
1872 : static void
1873 442 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1874 : {
1875 : ObjectAddress obj;
1876 : ListCell *lc;
1877 : Oid psid;
1878 :
1879 492 : foreach(lc, schemas)
1880 : {
1881 56 : Oid schemaid = lfirst_oid(lc);
1882 :
1883 56 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1884 : Anum_pg_publication_namespace_oid,
1885 : ObjectIdGetDatum(schemaid),
1886 : ObjectIdGetDatum(pubid));
1887 56 : if (!OidIsValid(psid))
1888 : {
1889 6 : if (missing_ok)
1890 0 : continue;
1891 :
1892 6 : ereport(ERROR,
1893 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1894 : errmsg("tables from schema \"%s\" are not part of the publication",
1895 : get_namespace_name(schemaid))));
1896 : }
1897 :
1898 50 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1899 50 : performDeletion(&obj, DROP_CASCADE, 0);
1900 : }
1901 436 : }
1902 :
1903 : /*
1904 : * Internal workhorse for changing a publication owner
1905 : */
1906 : static void
1907 26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1908 : {
1909 : Form_pg_publication form;
1910 :
1911 26 : form = (Form_pg_publication) GETSTRUCT(tup);
1912 :
1913 26 : if (form->pubowner == newOwnerId)
1914 2 : return;
1915 :
1916 24 : if (!superuser())
1917 : {
1918 : AclResult aclresult;
1919 :
1920 : /* Must be owner */
1921 12 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1922 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1923 0 : NameStr(form->pubname));
1924 :
1925 : /* Must be able to become new owner */
1926 12 : check_can_set_role(GetUserId(), newOwnerId);
1927 :
1928 : /* New owner must have CREATE privilege on database */
1929 12 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
1930 12 : if (aclresult != ACLCHECK_OK)
1931 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
1932 0 : get_database_name(MyDatabaseId));
1933 :
1934 12 : if (form->puballtables && !superuser_arg(newOwnerId))
1935 0 : ereport(ERROR,
1936 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1937 : errmsg("permission denied to change owner of publication \"%s\"",
1938 : NameStr(form->pubname)),
1939 : errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1940 :
1941 12 : if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1942 6 : ereport(ERROR,
1943 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1944 : errmsg("permission denied to change owner of publication \"%s\"",
1945 : NameStr(form->pubname)),
1946 : errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1947 : }
1948 :
1949 18 : form->pubowner = newOwnerId;
1950 18 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1951 :
1952 : /* Update owner dependency reference */
1953 18 : changeDependencyOnOwner(PublicationRelationId,
1954 : form->oid,
1955 : newOwnerId);
1956 :
1957 18 : InvokeObjectPostAlterHook(PublicationRelationId,
1958 : form->oid, 0);
1959 : }
1960 :
1961 : /*
1962 : * Change publication owner -- by name
1963 : */
1964 : ObjectAddress
1965 26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
1966 : {
1967 : Oid subid;
1968 : HeapTuple tup;
1969 : Relation rel;
1970 : ObjectAddress address;
1971 : Form_pg_publication pubform;
1972 :
1973 26 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1974 :
1975 26 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
1976 :
1977 26 : if (!HeapTupleIsValid(tup))
1978 0 : ereport(ERROR,
1979 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1980 : errmsg("publication \"%s\" does not exist", name)));
1981 :
1982 26 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1983 26 : subid = pubform->oid;
1984 :
1985 26 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
1986 :
1987 20 : ObjectAddressSet(address, PublicationRelationId, subid);
1988 :
1989 20 : heap_freetuple(tup);
1990 :
1991 20 : table_close(rel, RowExclusiveLock);
1992 :
1993 20 : return address;
1994 : }
1995 :
1996 : /*
1997 : * Change publication owner -- by OID
1998 : */
1999 : void
2000 0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
2001 : {
2002 : HeapTuple tup;
2003 : Relation rel;
2004 :
2005 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2006 :
2007 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
2008 :
2009 0 : if (!HeapTupleIsValid(tup))
2010 0 : ereport(ERROR,
2011 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2012 : errmsg("publication with OID %u does not exist", subid)));
2013 :
2014 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2015 :
2016 0 : heap_freetuple(tup);
2017 :
2018 0 : table_close(rel, RowExclusiveLock);
2019 0 : }
|