Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/namespace.h"
23 : #include "catalog/objectaccess.h"
24 : #include "catalog/objectaddress.h"
25 : #include "catalog/pg_database.h"
26 : #include "catalog/pg_inherits.h"
27 : #include "catalog/pg_namespace.h"
28 : #include "catalog/pg_proc.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "commands/dbcommands.h"
33 : #include "commands/defrem.h"
34 : #include "commands/event_trigger.h"
35 : #include "commands/publicationcmds.h"
36 : #include "miscadmin.h"
37 : #include "nodes/nodeFuncs.h"
38 : #include "parser/parse_clause.h"
39 : #include "parser/parse_collate.h"
40 : #include "parser/parse_relation.h"
41 : #include "rewrite/rewriteHandler.h"
42 : #include "storage/lmgr.h"
43 : #include "utils/acl.h"
44 : #include "utils/builtins.h"
45 : #include "utils/inval.h"
46 : #include "utils/lsyscache.h"
47 : #include "utils/rel.h"
48 : #include "utils/syscache.h"
49 : #include "utils/varlena.h"
50 :
51 :
52 : /*
53 : * Information used to validate the columns in the row filter expression. See
54 : * contain_invalid_rfcolumn_walker for details.
55 : */
56 : typedef struct rf_context
57 : {
58 : Bitmapset *bms_replident; /* bitset of replica identity columns */
59 : bool pubviaroot; /* true if we are validating the parent
60 : * relation's row filter */
61 : Oid relid; /* relid of the relation */
62 : Oid parentid; /* relid of the parent relation */
63 : } rf_context;
64 :
65 : static List *OpenTableList(List *tables);
66 : static void CloseTableList(List *rels);
67 : static void LockSchemaList(List *schemalist);
68 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
69 : AlterPublicationStmt *stmt);
70 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
71 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
72 : AlterPublicationStmt *stmt);
73 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
74 : static char defGetGeneratedColsOption(DefElem *def);
75 :
76 :
77 : static void
78 894 : parse_publication_options(ParseState *pstate,
79 : List *options,
80 : bool *publish_given,
81 : PublicationActions *pubactions,
82 : bool *publish_via_partition_root_given,
83 : bool *publish_via_partition_root,
84 : bool *publish_generated_columns_given,
85 : char *publish_generated_columns)
86 : {
87 : ListCell *lc;
88 :
89 894 : *publish_given = false;
90 894 : *publish_via_partition_root_given = false;
91 894 : *publish_generated_columns_given = false;
92 :
93 : /* defaults */
94 894 : pubactions->pubinsert = true;
95 894 : pubactions->pubupdate = true;
96 894 : pubactions->pubdelete = true;
97 894 : pubactions->pubtruncate = true;
98 894 : *publish_via_partition_root = false;
99 894 : *publish_generated_columns = PUBLISH_GENCOLS_NONE;
100 :
101 : /* Parse options */
102 1232 : foreach(lc, options)
103 : {
104 368 : DefElem *defel = (DefElem *) lfirst(lc);
105 :
106 368 : if (strcmp(defel->defname, "publish") == 0)
107 : {
108 : char *publish;
109 : List *publish_list;
110 : ListCell *lc2;
111 :
112 122 : if (*publish_given)
113 0 : errorConflictingDefElem(defel, pstate);
114 :
115 : /*
116 : * If publish option was given only the explicitly listed actions
117 : * should be published.
118 : */
119 122 : pubactions->pubinsert = false;
120 122 : pubactions->pubupdate = false;
121 122 : pubactions->pubdelete = false;
122 122 : pubactions->pubtruncate = false;
123 :
124 122 : *publish_given = true;
125 122 : publish = defGetString(defel);
126 :
127 122 : if (!SplitIdentifierString(publish, ',', &publish_list))
128 0 : ereport(ERROR,
129 : (errcode(ERRCODE_SYNTAX_ERROR),
130 : errmsg("invalid list syntax in parameter \"%s\"",
131 : "publish")));
132 :
133 : /* Process the option list. */
134 270 : foreach(lc2, publish_list)
135 : {
136 154 : char *publish_opt = (char *) lfirst(lc2);
137 :
138 154 : if (strcmp(publish_opt, "insert") == 0)
139 108 : pubactions->pubinsert = true;
140 46 : else if (strcmp(publish_opt, "update") == 0)
141 20 : pubactions->pubupdate = true;
142 26 : else if (strcmp(publish_opt, "delete") == 0)
143 10 : pubactions->pubdelete = true;
144 16 : else if (strcmp(publish_opt, "truncate") == 0)
145 10 : pubactions->pubtruncate = true;
146 : else
147 6 : ereport(ERROR,
148 : (errcode(ERRCODE_SYNTAX_ERROR),
149 : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
150 : "publish", publish_opt)));
151 : }
152 : }
153 246 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
154 : {
155 164 : if (*publish_via_partition_root_given)
156 6 : errorConflictingDefElem(defel, pstate);
157 158 : *publish_via_partition_root_given = true;
158 158 : *publish_via_partition_root = defGetBoolean(defel);
159 : }
160 82 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
161 : {
162 76 : if (*publish_generated_columns_given)
163 6 : errorConflictingDefElem(defel, pstate);
164 70 : *publish_generated_columns_given = true;
165 70 : *publish_generated_columns = defGetGeneratedColsOption(defel);
166 : }
167 : else
168 6 : ereport(ERROR,
169 : (errcode(ERRCODE_SYNTAX_ERROR),
170 : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
171 : }
172 864 : }
173 :
174 : /*
175 : * Convert the PublicationObjSpecType list into schema oid list and
176 : * PublicationTable list.
177 : */
178 : static void
179 1610 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
180 : List **rels, List **schemas)
181 : {
182 : ListCell *cell;
183 : PublicationObjSpec *pubobj;
184 :
185 1610 : if (!pubobjspec_list)
186 86 : return;
187 :
188 3210 : foreach(cell, pubobjspec_list)
189 : {
190 : Oid schemaid;
191 : List *search_path;
192 :
193 1722 : pubobj = (PublicationObjSpec *) lfirst(cell);
194 :
195 1722 : switch (pubobj->pubobjtype)
196 : {
197 1366 : case PUBLICATIONOBJ_TABLE:
198 1366 : *rels = lappend(*rels, pubobj->pubtable);
199 1366 : break;
200 332 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
201 332 : schemaid = get_namespace_oid(pubobj->name, false);
202 :
203 : /* Filter out duplicates if user specifies "sch1, sch1" */
204 302 : *schemas = list_append_unique_oid(*schemas, schemaid);
205 302 : break;
206 24 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
207 24 : search_path = fetch_search_path(false);
208 24 : if (search_path == NIL) /* nothing valid in search_path? */
209 6 : ereport(ERROR,
210 : errcode(ERRCODE_UNDEFINED_SCHEMA),
211 : errmsg("no schema has been selected for CURRENT_SCHEMA"));
212 :
213 18 : schemaid = linitial_oid(search_path);
214 18 : list_free(search_path);
215 :
216 : /* Filter out duplicates if user specifies "sch1, sch1" */
217 18 : *schemas = list_append_unique_oid(*schemas, schemaid);
218 18 : break;
219 0 : default:
220 : /* shouldn't happen */
221 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
222 : break;
223 : }
224 : }
225 : }
226 :
227 : /*
228 : * Returns true if any of the columns used in the row filter WHERE expression is
229 : * not part of REPLICA IDENTITY, false otherwise.
230 : */
231 : static bool
232 258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
233 : {
234 258 : if (node == NULL)
235 0 : return false;
236 :
237 258 : if (IsA(node, Var))
238 : {
239 104 : Var *var = (Var *) node;
240 104 : AttrNumber attnum = var->varattno;
241 :
242 : /*
243 : * If pubviaroot is true, we are validating the row filter of the
244 : * parent table, but the bitmap contains the replica identity
245 : * information of the child table. So, get the column number of the
246 : * child table as parent and child column order could be different.
247 : */
248 104 : if (context->pubviaroot)
249 : {
250 16 : char *colname = get_attname(context->parentid, attnum, false);
251 :
252 16 : attnum = get_attnum(context->relid, colname);
253 : }
254 :
255 104 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
256 104 : context->bms_replident))
257 60 : return true;
258 : }
259 :
260 198 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
261 : context);
262 : }
263 :
264 : /*
265 : * Check if all columns referenced in the filter expression are part of the
266 : * REPLICA IDENTITY index or not.
267 : *
268 : * Returns true if any invalid column is found.
269 : */
270 : bool
271 710 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
272 : bool pubviaroot)
273 : {
274 : HeapTuple rftuple;
275 710 : Oid relid = RelationGetRelid(relation);
276 710 : Oid publish_as_relid = RelationGetRelid(relation);
277 710 : bool result = false;
278 : Datum rfdatum;
279 : bool rfisnull;
280 :
281 : /*
282 : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
283 : * allowed in the row filter and we can skip the validation.
284 : */
285 710 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
286 186 : return false;
287 :
288 : /*
289 : * For a partition, if pubviaroot is true, find the topmost ancestor that
290 : * is published via this publication as we need to use its row filter
291 : * expression to filter the partition's changes.
292 : *
293 : * Note that even though the row filter used is for an ancestor, the
294 : * REPLICA IDENTITY used will be for the actual child table.
295 : */
296 524 : if (pubviaroot && relation->rd_rel->relispartition)
297 : {
298 : publish_as_relid
299 108 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
300 :
301 108 : if (!OidIsValid(publish_as_relid))
302 6 : publish_as_relid = relid;
303 : }
304 :
305 524 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
306 : ObjectIdGetDatum(publish_as_relid),
307 : ObjectIdGetDatum(pubid));
308 :
309 524 : if (!HeapTupleIsValid(rftuple))
310 56 : return false;
311 :
312 468 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
313 : Anum_pg_publication_rel_prqual,
314 : &rfisnull);
315 :
316 468 : if (!rfisnull)
317 : {
318 102 : rf_context context = {0};
319 : Node *rfnode;
320 102 : Bitmapset *bms = NULL;
321 :
322 102 : context.pubviaroot = pubviaroot;
323 102 : context.parentid = publish_as_relid;
324 102 : context.relid = relid;
325 :
326 : /* Remember columns that are part of the REPLICA IDENTITY */
327 102 : bms = RelationGetIndexAttrBitmap(relation,
328 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
329 :
330 102 : context.bms_replident = bms;
331 102 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
332 102 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
333 : }
334 :
335 468 : ReleaseSysCache(rftuple);
336 :
337 468 : return result;
338 : }
339 :
340 : /*
341 : * Check for invalid columns in the publication table definition.
342 : *
343 : * This function evaluates two conditions:
344 : *
345 : * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
346 : * by the column list. If any column is missing, *invalid_column_list is set
347 : * to true.
348 : * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
349 : * are published, either by being explicitly named in the column list or, if
350 : * no column list is specified, by setting the option
351 : * publish_generated_columns to stored. If any unpublished
352 : * generated column is found, *invalid_gen_col is set to true.
353 : *
354 : * Returns true if any of the above conditions are not met.
355 : */
356 : bool
357 902 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
358 : bool pubviaroot, char pubgencols_type,
359 : bool *invalid_column_list,
360 : bool *invalid_gen_col)
361 : {
362 902 : Oid relid = RelationGetRelid(relation);
363 902 : Oid publish_as_relid = RelationGetRelid(relation);
364 : Bitmapset *idattrs;
365 902 : Bitmapset *columns = NULL;
366 902 : TupleDesc desc = RelationGetDescr(relation);
367 : Publication *pub;
368 : int x;
369 :
370 902 : *invalid_column_list = false;
371 902 : *invalid_gen_col = false;
372 :
373 : /*
374 : * For a partition, if pubviaroot is true, find the topmost ancestor that
375 : * is published via this publication as we need to use its column list for
376 : * the changes.
377 : *
378 : * Note that even though the column list used is for an ancestor, the
379 : * REPLICA IDENTITY used will be for the actual child table.
380 : */
381 902 : if (pubviaroot && relation->rd_rel->relispartition)
382 : {
383 160 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
384 :
385 160 : if (!OidIsValid(publish_as_relid))
386 36 : publish_as_relid = relid;
387 : }
388 :
389 : /* Fetch the column list */
390 902 : pub = GetPublication(pubid);
391 902 : check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
392 :
393 902 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
394 : {
395 : /* With REPLICA IDENTITY FULL, no column list is allowed. */
396 200 : *invalid_column_list = (columns != NULL);
397 :
398 : /*
399 : * As we don't allow a column list with REPLICA IDENTITY FULL, the
400 : * publish_generated_columns option must be set to stored if the table
401 : * has any stored generated columns.
402 : */
403 200 : if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
404 188 : relation->rd_att->constr &&
405 76 : relation->rd_att->constr->has_generated_stored)
406 6 : *invalid_gen_col = true;
407 :
408 : /*
409 : * Virtual generated columns are currently not supported for logical
410 : * replication at all.
411 : */
412 200 : if (relation->rd_att->constr &&
413 88 : relation->rd_att->constr->has_generated_virtual)
414 12 : *invalid_gen_col = true;
415 :
416 200 : if (*invalid_gen_col && *invalid_column_list)
417 0 : return true;
418 : }
419 :
420 : /* Remember columns that are part of the REPLICA IDENTITY */
421 902 : idattrs = RelationGetIndexAttrBitmap(relation,
422 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
423 :
424 : /*
425 : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
426 : * (to handle system columns the usual way), while column list does not
427 : * use offset, so we can't do bms_is_subset(). Instead, we have to loop
428 : * over the idattrs and check all of them are in the list.
429 : */
430 902 : x = -1;
431 1548 : while ((x = bms_next_member(idattrs, x)) >= 0)
432 : {
433 652 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
434 652 : Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
435 :
436 652 : if (columns == NULL)
437 : {
438 : /*
439 : * The publish_generated_columns option must be set to stored if
440 : * the REPLICA IDENTITY contains any stored generated column.
441 : */
442 442 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
443 : {
444 6 : *invalid_gen_col = true;
445 6 : break;
446 : }
447 :
448 : /*
449 : * The equivalent setting for virtual generated columns does not
450 : * exist yet.
451 : */
452 436 : if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
453 : {
454 0 : *invalid_gen_col = true;
455 0 : break;
456 : }
457 :
458 : /* Skip validating the column list since it is not defined */
459 436 : continue;
460 : }
461 :
462 : /*
463 : * If pubviaroot is true, we are validating the column list of the
464 : * parent table, but the bitmap contains the replica identity
465 : * information of the child table. The parent/child attnums may not
466 : * match, so translate them to the parent - get the attname from the
467 : * child, and look it up in the parent.
468 : */
469 210 : if (pubviaroot)
470 : {
471 : /* attribute name in the child table */
472 80 : char *colname = get_attname(relid, attnum, false);
473 :
474 : /*
475 : * Determine the attnum for the attribute name in parent (we are
476 : * using the column list defined on the parent).
477 : */
478 80 : attnum = get_attnum(publish_as_relid, colname);
479 : }
480 :
481 : /* replica identity column, not covered by the column list */
482 210 : *invalid_column_list |= !bms_is_member(attnum, columns);
483 :
484 210 : if (*invalid_column_list && *invalid_gen_col)
485 0 : break;
486 : }
487 :
488 902 : bms_free(columns);
489 902 : bms_free(idattrs);
490 :
491 902 : return *invalid_column_list || *invalid_gen_col;
492 : }
493 :
494 : /* check_functions_in_node callback */
495 : static bool
496 430 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
497 : {
498 430 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
499 : func_id >= FirstNormalObjectId);
500 : }
501 :
502 : /*
503 : * The row filter walker checks if the row filter expression is a "simple
504 : * expression".
505 : *
506 : * It allows only simple or compound expressions such as:
507 : * - (Var Op Const)
508 : * - (Var Op Var)
509 : * - (Var Op Const) AND/OR (Var Op Const)
510 : * - etc
511 : * (where Var is a column of the table this filter belongs to)
512 : *
513 : * The simple expression has the following restrictions:
514 : * - User-defined operators are not allowed;
515 : * - User-defined functions are not allowed;
516 : * - User-defined types are not allowed;
517 : * - User-defined collations are not allowed;
518 : * - Non-immutable built-in functions are not allowed;
519 : * - System columns are not allowed.
520 : *
521 : * NOTES
522 : *
523 : * We don't allow user-defined functions/operators/types/collations because
524 : * (a) if a user drops a user-defined object used in a row filter expression or
525 : * if there is any other error while using it, the logical decoding
526 : * infrastructure won't be able to recover from such an error even if the
527 : * object is recreated again because a historic snapshot is used to evaluate
528 : * the row filter;
529 : * (b) a user-defined function can be used to access tables that could have
530 : * unpleasant results because a historic snapshot is used. That's why only
531 : * immutable built-in functions are allowed in row filter expressions.
532 : *
533 : * We don't allow system columns because currently, we don't have that
534 : * information in the tuple passed to downstream. Also, as we don't replicate
535 : * those to subscribers, there doesn't seem to be a need for a filter on those
536 : * columns.
537 : *
538 : * We can allow other node types after more analysis and testing.
539 : */
540 : static bool
541 1392 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
542 : {
543 1392 : char *errdetail_msg = NULL;
544 :
545 1392 : if (node == NULL)
546 6 : return false;
547 :
548 1386 : switch (nodeTag(node))
549 : {
550 370 : case T_Var:
551 : /* System columns are not allowed. */
552 370 : if (((Var *) node)->varattno < InvalidAttrNumber)
553 6 : errdetail_msg = _("System columns are not allowed.");
554 370 : break;
555 380 : case T_OpExpr:
556 : case T_DistinctExpr:
557 : case T_NullIfExpr:
558 : /* OK, except user-defined operators are not allowed. */
559 380 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
560 6 : errdetail_msg = _("User-defined operators are not allowed.");
561 380 : break;
562 6 : case T_ScalarArrayOpExpr:
563 : /* OK, except user-defined operators are not allowed. */
564 6 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
565 0 : errdetail_msg = _("User-defined operators are not allowed.");
566 :
567 : /*
568 : * We don't need to check the hashfuncid and negfuncid of
569 : * ScalarArrayOpExpr as those functions are only built for a
570 : * subquery.
571 : */
572 6 : break;
573 6 : case T_RowCompareExpr:
574 : {
575 : ListCell *opid;
576 :
577 : /* OK, except user-defined operators are not allowed. */
578 18 : foreach(opid, ((RowCompareExpr *) node)->opnos)
579 : {
580 12 : if (lfirst_oid(opid) >= FirstNormalObjectId)
581 : {
582 0 : errdetail_msg = _("User-defined operators are not allowed.");
583 0 : break;
584 : }
585 : }
586 : }
587 6 : break;
588 618 : case T_Const:
589 : case T_FuncExpr:
590 : case T_BoolExpr:
591 : case T_RelabelType:
592 : case T_CollateExpr:
593 : case T_CaseExpr:
594 : case T_CaseTestExpr:
595 : case T_ArrayExpr:
596 : case T_RowExpr:
597 : case T_CoalesceExpr:
598 : case T_MinMaxExpr:
599 : case T_XmlExpr:
600 : case T_NullTest:
601 : case T_BooleanTest:
602 : case T_List:
603 : /* OK, supported */
604 618 : break;
605 6 : default:
606 6 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
607 6 : break;
608 : }
609 :
610 : /*
611 : * For all the supported nodes, if we haven't already found a problem,
612 : * check the types, functions, and collations used in it. We check List
613 : * by walking through each element.
614 : */
615 1386 : if (!errdetail_msg && !IsA(node, List))
616 : {
617 1314 : if (exprType(node) >= FirstNormalObjectId)
618 6 : errdetail_msg = _("User-defined types are not allowed.");
619 1308 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
620 : pstate))
621 18 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
622 2580 : else if (exprCollation(node) >= FirstNormalObjectId ||
623 1290 : exprInputCollation(node) >= FirstNormalObjectId)
624 6 : errdetail_msg = _("User-defined collations are not allowed.");
625 : }
626 :
627 : /*
628 : * If we found a problem in this node, throw error now. Otherwise keep
629 : * going.
630 : */
631 1386 : if (errdetail_msg)
632 48 : ereport(ERROR,
633 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
634 : errmsg("invalid publication WHERE expression"),
635 : errdetail_internal("%s", errdetail_msg),
636 : parser_errposition(pstate, exprLocation(node))));
637 :
638 1338 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
639 : pstate);
640 : }
641 :
642 : /*
643 : * Check if the row filter expression is a "simple expression".
644 : *
645 : * See check_simple_rowfilter_expr_walker for details.
646 : */
647 : static bool
648 358 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
649 : {
650 358 : return check_simple_rowfilter_expr_walker(node, pstate);
651 : }
652 :
653 : /*
654 : * Transform the publication WHERE expression for all the relations in the list,
655 : * ensuring it is coerced to boolean and necessary collation information is
656 : * added if required, and add a new nsitem/RTE for the associated relation to
657 : * the ParseState's namespace list.
658 : *
659 : * Also check the publication row filter expression and throw an error if
660 : * anything not permitted or unexpected is encountered.
661 : */
662 : static void
663 1144 : TransformPubWhereClauses(List *tables, const char *queryString,
664 : bool pubviaroot)
665 : {
666 : ListCell *lc;
667 :
668 2282 : foreach(lc, tables)
669 : {
670 : ParseNamespaceItem *nsitem;
671 1204 : Node *whereclause = NULL;
672 : ParseState *pstate;
673 1204 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
674 :
675 1204 : if (pri->whereClause == NULL)
676 828 : continue;
677 :
678 : /*
679 : * If the publication doesn't publish changes via the root partitioned
680 : * table, the partition's row filter will be used. So disallow using
681 : * WHERE clause on partitioned table in this case.
682 : */
683 376 : if (!pubviaroot &&
684 354 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
685 6 : ereport(ERROR,
686 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
687 : errmsg("cannot use publication WHERE clause for relation \"%s\"",
688 : RelationGetRelationName(pri->relation)),
689 : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
690 : "publish_via_partition_root")));
691 :
692 : /*
693 : * A fresh pstate is required so that we only have "this" table in its
694 : * rangetable
695 : */
696 370 : pstate = make_parsestate(NULL);
697 370 : pstate->p_sourcetext = queryString;
698 370 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
699 : AccessShareLock, NULL,
700 : false, false);
701 370 : addNSItemToQuery(pstate, nsitem, false, true, true);
702 :
703 370 : whereclause = transformWhereClause(pstate,
704 370 : copyObject(pri->whereClause),
705 : EXPR_KIND_WHERE,
706 : "PUBLICATION WHERE");
707 :
708 : /* Fix up collation information */
709 358 : assign_expr_collations(pstate, whereclause);
710 :
711 358 : whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
712 :
713 : /*
714 : * We allow only simple expressions in row filters. See
715 : * check_simple_rowfilter_expr_walker.
716 : */
717 358 : check_simple_rowfilter_expr(whereclause, pstate);
718 :
719 310 : free_parsestate(pstate);
720 :
721 310 : pri->whereClause = whereclause;
722 : }
723 1078 : }
724 :
725 :
726 : /*
727 : * Given a list of tables that are going to be added to a publication,
728 : * verify that they fulfill the necessary preconditions, namely: no tables
729 : * have a column list if any schema is published; and partitioned tables do
730 : * not have column lists if publish_via_partition_root is not set.
731 : *
732 : * 'publish_schema' indicates that the publication contains any TABLES IN
733 : * SCHEMA elements (newly added in this command, or preexisting).
734 : * 'pubviaroot' is the value of publish_via_partition_root.
735 : */
736 : static void
737 1078 : CheckPubRelationColumnList(char *pubname, List *tables,
738 : bool publish_schema, bool pubviaroot)
739 : {
740 : ListCell *lc;
741 :
742 2186 : foreach(lc, tables)
743 : {
744 1138 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
745 :
746 1138 : if (pri->columns == NIL)
747 758 : continue;
748 :
749 : /*
750 : * Disallow specifying column list if any schema is in the
751 : * publication.
752 : *
753 : * XXX We could instead just forbid the case when the publication
754 : * tries to publish the table with a column list and a schema for that
755 : * table. However, if we do that then we need a restriction during
756 : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
757 : * seem to be a good idea.
758 : */
759 380 : if (publish_schema)
760 24 : ereport(ERROR,
761 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
762 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
763 : get_namespace_name(RelationGetNamespace(pri->relation)),
764 : RelationGetRelationName(pri->relation), pubname),
765 : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
766 :
767 : /*
768 : * If the publication doesn't publish changes via the root partitioned
769 : * table, the partition's column list will be used. So disallow using
770 : * a column list on the partitioned table in this case.
771 : */
772 356 : if (!pubviaroot &&
773 280 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
774 6 : ereport(ERROR,
775 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
776 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
777 : get_namespace_name(RelationGetNamespace(pri->relation)),
778 : RelationGetRelationName(pri->relation), pubname),
779 : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
780 : "publish_via_partition_root")));
781 : }
782 1048 : }
783 :
784 : /*
785 : * Create new publication.
786 : */
787 : ObjectAddress
788 790 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
789 : {
790 : Relation rel;
791 : ObjectAddress myself;
792 : Oid puboid;
793 : bool nulls[Natts_pg_publication];
794 : Datum values[Natts_pg_publication];
795 : HeapTuple tup;
796 : bool publish_given;
797 : PublicationActions pubactions;
798 : bool publish_via_partition_root_given;
799 : bool publish_via_partition_root;
800 : bool publish_generated_columns_given;
801 : char publish_generated_columns;
802 : AclResult aclresult;
803 790 : List *relations = NIL;
804 790 : List *schemaidlist = NIL;
805 :
806 : /* must have CREATE privilege on database */
807 790 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
808 790 : if (aclresult != ACLCHECK_OK)
809 6 : aclcheck_error(aclresult, OBJECT_DATABASE,
810 6 : get_database_name(MyDatabaseId));
811 :
812 : /* FOR ALL TABLES requires superuser */
813 784 : if (stmt->for_all_tables && !superuser())
814 0 : ereport(ERROR,
815 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
816 : errmsg("must be superuser to create FOR ALL TABLES publication")));
817 :
818 784 : rel = table_open(PublicationRelationId, RowExclusiveLock);
819 :
820 : /* Check if name is used */
821 784 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
822 : CStringGetDatum(stmt->pubname));
823 784 : if (OidIsValid(puboid))
824 6 : ereport(ERROR,
825 : (errcode(ERRCODE_DUPLICATE_OBJECT),
826 : errmsg("publication \"%s\" already exists",
827 : stmt->pubname)));
828 :
829 : /* Form a tuple. */
830 778 : memset(values, 0, sizeof(values));
831 778 : memset(nulls, false, sizeof(nulls));
832 :
833 778 : values[Anum_pg_publication_pubname - 1] =
834 778 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
835 778 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
836 :
837 778 : parse_publication_options(pstate,
838 : stmt->options,
839 : &publish_given, &pubactions,
840 : &publish_via_partition_root_given,
841 : &publish_via_partition_root,
842 : &publish_generated_columns_given,
843 : &publish_generated_columns);
844 :
845 748 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
846 : Anum_pg_publication_oid);
847 748 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
848 748 : values[Anum_pg_publication_puballtables - 1] =
849 748 : BoolGetDatum(stmt->for_all_tables);
850 748 : values[Anum_pg_publication_pubinsert - 1] =
851 748 : BoolGetDatum(pubactions.pubinsert);
852 748 : values[Anum_pg_publication_pubupdate - 1] =
853 748 : BoolGetDatum(pubactions.pubupdate);
854 748 : values[Anum_pg_publication_pubdelete - 1] =
855 748 : BoolGetDatum(pubactions.pubdelete);
856 748 : values[Anum_pg_publication_pubtruncate - 1] =
857 748 : BoolGetDatum(pubactions.pubtruncate);
858 748 : values[Anum_pg_publication_pubviaroot - 1] =
859 748 : BoolGetDatum(publish_via_partition_root);
860 748 : values[Anum_pg_publication_pubgencols - 1] =
861 748 : CharGetDatum(publish_generated_columns);
862 :
863 748 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
864 :
865 : /* Insert tuple into catalog. */
866 748 : CatalogTupleInsert(rel, tup);
867 748 : heap_freetuple(tup);
868 :
869 748 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
870 :
871 748 : ObjectAddressSet(myself, PublicationRelationId, puboid);
872 :
873 : /* Make the changes visible. */
874 748 : CommandCounterIncrement();
875 :
876 : /* Associate objects with the publication. */
877 748 : if (stmt->for_all_tables)
878 : {
879 : /* Invalidate relcache so that publication info is rebuilt. */
880 88 : CacheInvalidateRelcacheAll();
881 : }
882 : else
883 : {
884 660 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
885 : &schemaidlist);
886 :
887 : /* FOR TABLES IN SCHEMA requires superuser */
888 642 : if (schemaidlist != NIL && !superuser())
889 6 : ereport(ERROR,
890 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
891 : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
892 :
893 636 : if (relations != NIL)
894 : {
895 : List *rels;
896 :
897 442 : rels = OpenTableList(relations);
898 418 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
899 : publish_via_partition_root);
900 :
901 388 : CheckPubRelationColumnList(stmt->pubname, rels,
902 : schemaidlist != NIL,
903 : publish_via_partition_root);
904 :
905 382 : PublicationAddTables(puboid, rels, true, NULL);
906 356 : CloseTableList(rels);
907 : }
908 :
909 550 : if (schemaidlist != NIL)
910 : {
911 : /*
912 : * Schema lock is held until the publication is created to prevent
913 : * concurrent schema deletion.
914 : */
915 134 : LockSchemaList(schemaidlist);
916 134 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
917 : }
918 : }
919 :
920 632 : table_close(rel, RowExclusiveLock);
921 :
922 632 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
923 :
924 632 : if (wal_level != WAL_LEVEL_LOGICAL)
925 368 : ereport(WARNING,
926 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
927 : errmsg("\"wal_level\" is insufficient to publish logical changes"),
928 : errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
929 :
930 632 : return myself;
931 : }
932 :
933 : /*
934 : * Change options of a publication.
935 : */
936 : static void
937 116 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
938 : Relation rel, HeapTuple tup)
939 : {
940 : bool nulls[Natts_pg_publication];
941 : bool replaces[Natts_pg_publication];
942 : Datum values[Natts_pg_publication];
943 : bool publish_given;
944 : PublicationActions pubactions;
945 : bool publish_via_partition_root_given;
946 : bool publish_via_partition_root;
947 : bool publish_generated_columns_given;
948 : char publish_generated_columns;
949 : ObjectAddress obj;
950 : Form_pg_publication pubform;
951 116 : List *root_relids = NIL;
952 : ListCell *lc;
953 :
954 116 : parse_publication_options(pstate,
955 : stmt->options,
956 : &publish_given, &pubactions,
957 : &publish_via_partition_root_given,
958 : &publish_via_partition_root,
959 : &publish_generated_columns_given,
960 : &publish_generated_columns);
961 :
962 116 : pubform = (Form_pg_publication) GETSTRUCT(tup);
963 :
964 : /*
965 : * If the publication doesn't publish changes via the root partitioned
966 : * table, the partition's row filter and column list will be used. So
967 : * disallow using WHERE clause and column lists on partitioned table in
968 : * this case.
969 : */
970 116 : if (!pubform->puballtables && publish_via_partition_root_given &&
971 80 : !publish_via_partition_root)
972 : {
973 : /*
974 : * Lock the publication so nobody else can do anything with it. This
975 : * prevents concurrent alter to add partitioned table(s) with WHERE
976 : * clause(s) and/or column lists which we don't allow when not
977 : * publishing via root.
978 : */
979 48 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
980 : AccessShareLock);
981 :
982 48 : root_relids = GetPublicationRelations(pubform->oid,
983 : PUBLICATION_PART_ROOT);
984 :
985 84 : foreach(lc, root_relids)
986 : {
987 48 : Oid relid = lfirst_oid(lc);
988 : HeapTuple rftuple;
989 : char relkind;
990 : char *relname;
991 : bool has_rowfilter;
992 : bool has_collist;
993 :
994 : /*
995 : * Beware: we don't have lock on the relations, so cope silently
996 : * with the cache lookups returning NULL.
997 : */
998 :
999 48 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1000 : ObjectIdGetDatum(relid),
1001 : ObjectIdGetDatum(pubform->oid));
1002 48 : if (!HeapTupleIsValid(rftuple))
1003 0 : continue;
1004 48 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1005 48 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1006 48 : if (!has_rowfilter && !has_collist)
1007 : {
1008 12 : ReleaseSysCache(rftuple);
1009 12 : continue;
1010 : }
1011 :
1012 36 : relkind = get_rel_relkind(relid);
1013 36 : if (relkind != RELKIND_PARTITIONED_TABLE)
1014 : {
1015 24 : ReleaseSysCache(rftuple);
1016 24 : continue;
1017 : }
1018 12 : relname = get_rel_name(relid);
1019 12 : if (relname == NULL) /* table concurrently dropped */
1020 : {
1021 0 : ReleaseSysCache(rftuple);
1022 0 : continue;
1023 : }
1024 :
1025 12 : if (has_rowfilter)
1026 6 : ereport(ERROR,
1027 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1028 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1029 : "publish_via_partition_root",
1030 : stmt->pubname),
1031 : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1032 : relname, "publish_via_partition_root")));
1033 : Assert(has_collist);
1034 6 : ereport(ERROR,
1035 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1036 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1037 : "publish_via_partition_root",
1038 : stmt->pubname),
1039 : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1040 : relname, "publish_via_partition_root")));
1041 : }
1042 : }
1043 :
1044 : /* Everything ok, form a new tuple. */
1045 104 : memset(values, 0, sizeof(values));
1046 104 : memset(nulls, false, sizeof(nulls));
1047 104 : memset(replaces, false, sizeof(replaces));
1048 :
1049 104 : if (publish_given)
1050 : {
1051 28 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1052 28 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1053 :
1054 28 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1055 28 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1056 :
1057 28 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1058 28 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1059 :
1060 28 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1061 28 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1062 : }
1063 :
1064 104 : if (publish_via_partition_root_given)
1065 : {
1066 70 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1067 70 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1068 : }
1069 :
1070 104 : if (publish_generated_columns_given)
1071 : {
1072 6 : values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1073 6 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1074 : }
1075 :
1076 104 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1077 : replaces);
1078 :
1079 : /* Update the catalog. */
1080 104 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1081 :
1082 104 : CommandCounterIncrement();
1083 :
1084 104 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1085 :
1086 : /* Invalidate the relcache. */
1087 104 : if (pubform->puballtables)
1088 : {
1089 8 : CacheInvalidateRelcacheAll();
1090 : }
1091 : else
1092 : {
1093 96 : List *relids = NIL;
1094 96 : List *schemarelids = NIL;
1095 :
1096 : /*
1097 : * For any partitioned tables contained in the publication, we must
1098 : * invalidate all partitions contained in the respective partition
1099 : * trees, not just those explicitly mentioned in the publication.
1100 : */
1101 96 : if (root_relids == NIL)
1102 60 : relids = GetPublicationRelations(pubform->oid,
1103 : PUBLICATION_PART_ALL);
1104 : else
1105 : {
1106 : /*
1107 : * We already got tables explicitly mentioned in the publication.
1108 : * Now get all partitions for the partitioned table in the list.
1109 : */
1110 72 : foreach(lc, root_relids)
1111 36 : relids = GetPubPartitionOptionRelations(relids,
1112 : PUBLICATION_PART_ALL,
1113 : lfirst_oid(lc));
1114 : }
1115 :
1116 96 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1117 : PUBLICATION_PART_ALL);
1118 96 : relids = list_concat_unique_oid(relids, schemarelids);
1119 :
1120 96 : InvalidatePublicationRels(relids);
1121 : }
1122 :
1123 104 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1124 104 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1125 : (Node *) stmt);
1126 :
1127 104 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1128 104 : }
1129 :
1130 : /*
1131 : * Invalidate the relations.
1132 : */
1133 : void
1134 2336 : InvalidatePublicationRels(List *relids)
1135 : {
1136 : /*
1137 : * We don't want to send too many individual messages, at some point it's
1138 : * cheaper to just reset whole relcache.
1139 : */
1140 2336 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1141 : {
1142 : ListCell *lc;
1143 :
1144 19572 : foreach(lc, relids)
1145 17236 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1146 : }
1147 : else
1148 0 : CacheInvalidateRelcacheAll();
1149 2336 : }
1150 :
1151 : /*
1152 : * Add or remove table to/from publication.
1153 : */
1154 : static void
1155 890 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1156 : List *tables, const char *queryString,
1157 : bool publish_schema)
1158 : {
1159 890 : List *rels = NIL;
1160 890 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1161 890 : Oid pubid = pubform->oid;
1162 :
1163 : /*
1164 : * Nothing to do if no objects, except in SET: for that it is quite
1165 : * possible that user has not specified any tables in which case we need
1166 : * to remove all the existing tables.
1167 : */
1168 890 : if (!tables && stmt->action != AP_SetObjects)
1169 66 : return;
1170 :
1171 824 : rels = OpenTableList(tables);
1172 :
1173 824 : if (stmt->action == AP_AddObjects)
1174 : {
1175 280 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1176 :
1177 262 : publish_schema |= is_schema_publication(pubid);
1178 :
1179 262 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1180 262 : pubform->pubviaroot);
1181 :
1182 250 : PublicationAddTables(pubid, rels, false, stmt);
1183 : }
1184 544 : else if (stmt->action == AP_DropObjects)
1185 98 : PublicationDropTables(pubid, rels, false);
1186 : else /* AP_SetObjects */
1187 : {
1188 446 : List *oldrelids = GetPublicationRelations(pubid,
1189 : PUBLICATION_PART_ROOT);
1190 446 : List *delrels = NIL;
1191 : ListCell *oldlc;
1192 :
1193 446 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1194 :
1195 428 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1196 428 : pubform->pubviaroot);
1197 :
1198 : /*
1199 : * To recreate the relation list for the publication, look for
1200 : * existing relations that do not need to be dropped.
1201 : */
1202 806 : foreach(oldlc, oldrelids)
1203 : {
1204 402 : Oid oldrelid = lfirst_oid(oldlc);
1205 : ListCell *newlc;
1206 : PublicationRelInfo *oldrel;
1207 402 : bool found = false;
1208 : HeapTuple rftuple;
1209 402 : Node *oldrelwhereclause = NULL;
1210 402 : Bitmapset *oldcolumns = NULL;
1211 :
1212 : /* look up the cache for the old relmap */
1213 402 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1214 : ObjectIdGetDatum(oldrelid),
1215 : ObjectIdGetDatum(pubid));
1216 :
1217 : /*
1218 : * See if the existing relation currently has a WHERE clause or a
1219 : * column list. We need to compare those too.
1220 : */
1221 402 : if (HeapTupleIsValid(rftuple))
1222 : {
1223 402 : bool isnull = true;
1224 : Datum whereClauseDatum;
1225 : Datum columnListDatum;
1226 :
1227 : /* Load the WHERE clause for this table. */
1228 402 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1229 : Anum_pg_publication_rel_prqual,
1230 : &isnull);
1231 402 : if (!isnull)
1232 210 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1233 :
1234 : /* Transform the int2vector column list to a bitmap. */
1235 402 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1236 : Anum_pg_publication_rel_prattrs,
1237 : &isnull);
1238 :
1239 402 : if (!isnull)
1240 136 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1241 :
1242 402 : ReleaseSysCache(rftuple);
1243 : }
1244 :
1245 778 : foreach(newlc, rels)
1246 : {
1247 : PublicationRelInfo *newpubrel;
1248 : Oid newrelid;
1249 404 : Bitmapset *newcolumns = NULL;
1250 :
1251 404 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1252 404 : newrelid = RelationGetRelid(newpubrel->relation);
1253 :
1254 : /*
1255 : * Validate the column list. If the column list or WHERE
1256 : * clause changes, then the validation done here will be
1257 : * duplicated inside PublicationAddTables(). The validation
1258 : * is cheap enough that that seems harmless.
1259 : */
1260 404 : newcolumns = pub_collist_validate(newpubrel->relation,
1261 : newpubrel->columns);
1262 :
1263 : /*
1264 : * Check if any of the new set of relations matches with the
1265 : * existing relations in the publication. Additionally, if the
1266 : * relation has an associated WHERE clause, check the WHERE
1267 : * expressions also match. Same for the column list. Drop the
1268 : * rest.
1269 : */
1270 392 : if (newrelid == oldrelid)
1271 : {
1272 284 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1273 80 : bms_equal(oldcolumns, newcolumns))
1274 : {
1275 16 : found = true;
1276 16 : break;
1277 : }
1278 : }
1279 : }
1280 :
1281 : /*
1282 : * Add the non-matched relations to a list so that they can be
1283 : * dropped.
1284 : */
1285 390 : if (!found)
1286 : {
1287 374 : oldrel = palloc(sizeof(PublicationRelInfo));
1288 374 : oldrel->whereClause = NULL;
1289 374 : oldrel->columns = NIL;
1290 374 : oldrel->relation = table_open(oldrelid,
1291 : ShareUpdateExclusiveLock);
1292 374 : delrels = lappend(delrels, oldrel);
1293 : }
1294 : }
1295 :
1296 : /* And drop them. */
1297 404 : PublicationDropTables(pubid, delrels, true);
1298 :
1299 : /*
1300 : * Don't bother calculating the difference for adding, we'll catch and
1301 : * skip existing ones when doing catalog update.
1302 : */
1303 404 : PublicationAddTables(pubid, rels, true, stmt);
1304 :
1305 404 : CloseTableList(delrels);
1306 : }
1307 :
1308 692 : CloseTableList(rels);
1309 : }
1310 :
1311 : /*
1312 : * Alter the publication schemas.
1313 : *
1314 : * Add or remove schemas to/from publication.
1315 : */
1316 : static void
1317 758 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1318 : HeapTuple tup, List *schemaidlist)
1319 : {
1320 758 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1321 :
1322 : /*
1323 : * Nothing to do if no objects, except in SET: for that it is quite
1324 : * possible that user has not specified any schemas in which case we need
1325 : * to remove all the existing schemas.
1326 : */
1327 758 : if (!schemaidlist && stmt->action != AP_SetObjects)
1328 288 : return;
1329 :
1330 : /*
1331 : * Schema lock is held until the publication is altered to prevent
1332 : * concurrent schema deletion.
1333 : */
1334 470 : LockSchemaList(schemaidlist);
1335 470 : if (stmt->action == AP_AddObjects)
1336 : {
1337 : ListCell *lc;
1338 : List *reloids;
1339 :
1340 28 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1341 :
1342 36 : foreach(lc, reloids)
1343 : {
1344 : HeapTuple coltuple;
1345 :
1346 14 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1347 : ObjectIdGetDatum(lfirst_oid(lc)),
1348 : ObjectIdGetDatum(pubform->oid));
1349 :
1350 14 : if (!HeapTupleIsValid(coltuple))
1351 0 : continue;
1352 :
1353 : /*
1354 : * Disallow adding schema if column list is already part of the
1355 : * publication. See CheckPubRelationColumnList.
1356 : */
1357 14 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1358 6 : ereport(ERROR,
1359 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1360 : errmsg("cannot add schema to publication \"%s\"",
1361 : stmt->pubname),
1362 : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1363 :
1364 8 : ReleaseSysCache(coltuple);
1365 : }
1366 :
1367 22 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1368 : }
1369 442 : else if (stmt->action == AP_DropObjects)
1370 38 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1371 : else /* AP_SetObjects */
1372 : {
1373 404 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1374 404 : List *delschemas = NIL;
1375 :
1376 : /* Identify which schemas should be dropped */
1377 404 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1378 :
1379 : /*
1380 : * Schema lock is held until the publication is altered to prevent
1381 : * concurrent schema deletion.
1382 : */
1383 404 : LockSchemaList(delschemas);
1384 :
1385 : /* And drop them */
1386 404 : PublicationDropSchemas(pubform->oid, delschemas, true);
1387 :
1388 : /*
1389 : * Don't bother calculating the difference for adding, we'll catch and
1390 : * skip existing ones when doing catalog update.
1391 : */
1392 404 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1393 : }
1394 : }
1395 :
1396 : /*
1397 : * Check if relations and schemas can be in a given publication and throw
1398 : * appropriate error if not.
1399 : */
1400 : static void
1401 932 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1402 : List *tables, List *schemaidlist)
1403 : {
1404 932 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1405 :
1406 932 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1407 94 : schemaidlist && !superuser())
1408 6 : ereport(ERROR,
1409 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1410 : errmsg("must be superuser to add or set schemas")));
1411 :
1412 : /*
1413 : * Check that user is allowed to manipulate the publication tables in
1414 : * schema
1415 : */
1416 926 : if (schemaidlist && pubform->puballtables)
1417 18 : ereport(ERROR,
1418 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1419 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1420 : NameStr(pubform->pubname)),
1421 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1422 :
1423 : /* Check that user is allowed to manipulate the publication tables. */
1424 908 : if (tables && pubform->puballtables)
1425 18 : ereport(ERROR,
1426 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1427 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1428 : NameStr(pubform->pubname)),
1429 : errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1430 890 : }
1431 :
1432 : /*
1433 : * Alter the existing publication.
1434 : *
1435 : * This is dispatcher function for AlterPublicationOptions,
1436 : * AlterPublicationSchemas and AlterPublicationTables.
1437 : */
1438 : void
1439 1066 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1440 : {
1441 : Relation rel;
1442 : HeapTuple tup;
1443 : Form_pg_publication pubform;
1444 :
1445 1066 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1446 :
1447 1066 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1448 : CStringGetDatum(stmt->pubname));
1449 :
1450 1066 : if (!HeapTupleIsValid(tup))
1451 0 : ereport(ERROR,
1452 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1453 : errmsg("publication \"%s\" does not exist",
1454 : stmt->pubname)));
1455 :
1456 1066 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1457 :
1458 : /* must be owner */
1459 1066 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1460 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1461 0 : stmt->pubname);
1462 :
1463 1066 : if (stmt->options)
1464 116 : AlterPublicationOptions(pstate, stmt, rel, tup);
1465 : else
1466 : {
1467 950 : List *relations = NIL;
1468 950 : List *schemaidlist = NIL;
1469 950 : Oid pubid = pubform->oid;
1470 :
1471 950 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1472 : &schemaidlist);
1473 :
1474 932 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1475 :
1476 890 : heap_freetuple(tup);
1477 :
1478 : /* Lock the publication so nobody else can do anything with it. */
1479 890 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1480 : AccessExclusiveLock);
1481 :
1482 : /*
1483 : * It is possible that by the time we acquire the lock on publication,
1484 : * concurrent DDL has removed it. We can test this by checking the
1485 : * existence of publication. We get the tuple again to avoid the risk
1486 : * of any publication option getting changed.
1487 : */
1488 890 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1489 890 : if (!HeapTupleIsValid(tup))
1490 0 : ereport(ERROR,
1491 : errcode(ERRCODE_UNDEFINED_OBJECT),
1492 : errmsg("publication \"%s\" does not exist",
1493 : stmt->pubname));
1494 :
1495 890 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1496 : schemaidlist != NIL);
1497 758 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1498 : }
1499 :
1500 : /* Cleanup. */
1501 844 : heap_freetuple(tup);
1502 844 : table_close(rel, RowExclusiveLock);
1503 844 : }
1504 :
1505 : /*
1506 : * Remove relation from publication by mapping OID.
1507 : */
1508 : void
1509 838 : RemovePublicationRelById(Oid proid)
1510 : {
1511 : Relation rel;
1512 : HeapTuple tup;
1513 : Form_pg_publication_rel pubrel;
1514 838 : List *relids = NIL;
1515 :
1516 838 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1517 :
1518 838 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1519 :
1520 838 : if (!HeapTupleIsValid(tup))
1521 0 : elog(ERROR, "cache lookup failed for publication table %u",
1522 : proid);
1523 :
1524 838 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1525 :
1526 : /*
1527 : * Invalidate relcache so that publication info is rebuilt.
1528 : *
1529 : * For the partitioned tables, we must invalidate all partitions contained
1530 : * in the respective partition hierarchies, not just the one explicitly
1531 : * mentioned in the publication. This is required because we implicitly
1532 : * publish the child tables when the parent table is published.
1533 : */
1534 838 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1535 : pubrel->prrelid);
1536 :
1537 838 : InvalidatePublicationRels(relids);
1538 :
1539 838 : CatalogTupleDelete(rel, &tup->t_self);
1540 :
1541 838 : ReleaseSysCache(tup);
1542 :
1543 838 : table_close(rel, RowExclusiveLock);
1544 838 : }
1545 :
1546 : /*
1547 : * Remove the publication by mapping OID.
1548 : */
1549 : void
1550 458 : RemovePublicationById(Oid pubid)
1551 : {
1552 : Relation rel;
1553 : HeapTuple tup;
1554 : Form_pg_publication pubform;
1555 :
1556 458 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1557 :
1558 458 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1559 458 : if (!HeapTupleIsValid(tup))
1560 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1561 :
1562 458 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1563 :
1564 : /* Invalidate relcache so that publication info is rebuilt. */
1565 458 : if (pubform->puballtables)
1566 52 : CacheInvalidateRelcacheAll();
1567 :
1568 458 : CatalogTupleDelete(rel, &tup->t_self);
1569 :
1570 458 : ReleaseSysCache(tup);
1571 :
1572 458 : table_close(rel, RowExclusiveLock);
1573 458 : }
1574 :
1575 : /*
1576 : * Remove schema from publication by mapping OID.
1577 : */
1578 : void
1579 192 : RemovePublicationSchemaById(Oid psoid)
1580 : {
1581 : Relation rel;
1582 : HeapTuple tup;
1583 192 : List *schemaRels = NIL;
1584 : Form_pg_publication_namespace pubsch;
1585 :
1586 192 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1587 :
1588 192 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1589 :
1590 192 : if (!HeapTupleIsValid(tup))
1591 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1592 :
1593 192 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1594 :
1595 : /*
1596 : * Invalidate relcache so that publication info is rebuilt. See
1597 : * RemovePublicationRelById for why we need to consider all the
1598 : * partitions.
1599 : */
1600 192 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1601 : PUBLICATION_PART_ALL);
1602 192 : InvalidatePublicationRels(schemaRels);
1603 :
1604 192 : CatalogTupleDelete(rel, &tup->t_self);
1605 :
1606 192 : ReleaseSysCache(tup);
1607 :
1608 192 : table_close(rel, RowExclusiveLock);
1609 192 : }
1610 :
1611 : /*
1612 : * Open relations specified by a PublicationTable list.
1613 : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1614 : * add them to a publication.
1615 : */
1616 : static List *
1617 1266 : OpenTableList(List *tables)
1618 : {
1619 1266 : List *relids = NIL;
1620 1266 : List *rels = NIL;
1621 : ListCell *lc;
1622 1266 : List *relids_with_rf = NIL;
1623 1266 : List *relids_with_collist = NIL;
1624 :
1625 : /*
1626 : * Open, share-lock, and check all the explicitly-specified relations
1627 : */
1628 2590 : foreach(lc, tables)
1629 : {
1630 1348 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1631 1348 : bool recurse = t->relation->inh;
1632 : Relation rel;
1633 : Oid myrelid;
1634 : PublicationRelInfo *pub_rel;
1635 :
1636 : /* Allow query cancel in case this takes a long time */
1637 1348 : CHECK_FOR_INTERRUPTS();
1638 :
1639 1348 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1640 1348 : myrelid = RelationGetRelid(rel);
1641 :
1642 : /*
1643 : * Filter out duplicates if user specifies "foo, foo".
1644 : *
1645 : * Note that this algorithm is known to not be very efficient (O(N^2))
1646 : * but given that it only works on list of tables given to us by user
1647 : * it's deemed acceptable.
1648 : */
1649 1348 : if (list_member_oid(relids, myrelid))
1650 : {
1651 : /* Disallow duplicate tables if there are any with row filters. */
1652 24 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1653 12 : ereport(ERROR,
1654 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1655 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1656 : RelationGetRelationName(rel))));
1657 :
1658 : /* Disallow duplicate tables if there are any with column lists. */
1659 12 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1660 12 : ereport(ERROR,
1661 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1662 : errmsg("conflicting or redundant column lists for table \"%s\"",
1663 : RelationGetRelationName(rel))));
1664 :
1665 0 : table_close(rel, ShareUpdateExclusiveLock);
1666 0 : continue;
1667 : }
1668 :
1669 1324 : pub_rel = palloc(sizeof(PublicationRelInfo));
1670 1324 : pub_rel->relation = rel;
1671 1324 : pub_rel->whereClause = t->whereClause;
1672 1324 : pub_rel->columns = t->columns;
1673 1324 : rels = lappend(rels, pub_rel);
1674 1324 : relids = lappend_oid(relids, myrelid);
1675 :
1676 1324 : if (t->whereClause)
1677 386 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1678 :
1679 1324 : if (t->columns)
1680 386 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1681 :
1682 : /*
1683 : * Add children of this rel, if requested, so that they too are added
1684 : * to the publication. A partitioned table can't have any inheritance
1685 : * children other than its partitions, which need not be explicitly
1686 : * added to the publication.
1687 : */
1688 1324 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1689 : {
1690 : List *children;
1691 : ListCell *child;
1692 :
1693 1140 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1694 : NULL);
1695 :
1696 2288 : foreach(child, children)
1697 : {
1698 1148 : Oid childrelid = lfirst_oid(child);
1699 :
1700 : /* Allow query cancel in case this takes a long time */
1701 1148 : CHECK_FOR_INTERRUPTS();
1702 :
1703 : /*
1704 : * Skip duplicates if user specified both parent and child
1705 : * tables.
1706 : */
1707 1148 : if (list_member_oid(relids, childrelid))
1708 : {
1709 : /*
1710 : * We don't allow to specify row filter for both parent
1711 : * and child table at the same time as it is not very
1712 : * clear which one should be given preference.
1713 : */
1714 1140 : if (childrelid != myrelid &&
1715 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1716 0 : ereport(ERROR,
1717 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1718 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1719 : RelationGetRelationName(rel))));
1720 :
1721 : /*
1722 : * We don't allow to specify column list for both parent
1723 : * and child table at the same time as it is not very
1724 : * clear which one should be given preference.
1725 : */
1726 1140 : if (childrelid != myrelid &&
1727 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1728 0 : ereport(ERROR,
1729 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1730 : errmsg("conflicting or redundant column lists for table \"%s\"",
1731 : RelationGetRelationName(rel))));
1732 :
1733 1140 : continue;
1734 : }
1735 :
1736 : /* find_all_inheritors already got lock */
1737 8 : rel = table_open(childrelid, NoLock);
1738 8 : pub_rel = palloc(sizeof(PublicationRelInfo));
1739 8 : pub_rel->relation = rel;
1740 : /* child inherits WHERE clause from parent */
1741 8 : pub_rel->whereClause = t->whereClause;
1742 :
1743 : /* child inherits column list from parent */
1744 8 : pub_rel->columns = t->columns;
1745 8 : rels = lappend(rels, pub_rel);
1746 8 : relids = lappend_oid(relids, childrelid);
1747 :
1748 8 : if (t->whereClause)
1749 2 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1750 :
1751 8 : if (t->columns)
1752 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1753 : }
1754 : }
1755 : }
1756 :
1757 1242 : list_free(relids);
1758 1242 : list_free(relids_with_rf);
1759 :
1760 1242 : return rels;
1761 : }
1762 :
1763 : /*
1764 : * Close all relations in the list.
1765 : */
1766 : static void
1767 1452 : CloseTableList(List *rels)
1768 : {
1769 : ListCell *lc;
1770 :
1771 2940 : foreach(lc, rels)
1772 : {
1773 : PublicationRelInfo *pub_rel;
1774 :
1775 1488 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1776 1488 : table_close(pub_rel->relation, NoLock);
1777 : }
1778 :
1779 1452 : list_free_deep(rels);
1780 1452 : }
1781 :
1782 : /*
1783 : * Lock the schemas specified in the schema list in AccessShareLock mode in
1784 : * order to prevent concurrent schema deletion.
1785 : */
1786 : static void
1787 1008 : LockSchemaList(List *schemalist)
1788 : {
1789 : ListCell *lc;
1790 :
1791 1292 : foreach(lc, schemalist)
1792 : {
1793 284 : Oid schemaid = lfirst_oid(lc);
1794 :
1795 : /* Allow query cancel in case this takes a long time */
1796 284 : CHECK_FOR_INTERRUPTS();
1797 284 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1798 :
1799 : /*
1800 : * It is possible that by the time we acquire the lock on schema,
1801 : * concurrent DDL has removed it. We can test this by checking the
1802 : * existence of schema.
1803 : */
1804 284 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1805 0 : ereport(ERROR,
1806 : errcode(ERRCODE_UNDEFINED_SCHEMA),
1807 : errmsg("schema with OID %u does not exist", schemaid));
1808 : }
1809 1008 : }
1810 :
1811 : /*
1812 : * Add listed tables to the publication.
1813 : */
1814 : static void
1815 1036 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1816 : AlterPublicationStmt *stmt)
1817 : {
1818 : ListCell *lc;
1819 :
1820 : Assert(!stmt || !stmt->for_all_tables);
1821 :
1822 2064 : foreach(lc, rels)
1823 : {
1824 1096 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1825 1096 : Relation rel = pub_rel->relation;
1826 : ObjectAddress obj;
1827 :
1828 : /* Must be owner of the table or superuser. */
1829 1096 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1830 6 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1831 6 : RelationGetRelationName(rel));
1832 :
1833 1090 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1834 1028 : if (stmt)
1835 : {
1836 612 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1837 : (Node *) stmt);
1838 :
1839 612 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1840 : obj.objectId, 0);
1841 : }
1842 : }
1843 968 : }
1844 :
1845 : /*
1846 : * Remove listed tables from the publication.
1847 : */
1848 : static void
1849 502 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1850 : {
1851 : ObjectAddress obj;
1852 : ListCell *lc;
1853 : Oid prid;
1854 :
1855 962 : foreach(lc, rels)
1856 : {
1857 478 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1858 478 : Relation rel = pubrel->relation;
1859 478 : Oid relid = RelationGetRelid(rel);
1860 :
1861 478 : if (pubrel->columns)
1862 0 : ereport(ERROR,
1863 : errcode(ERRCODE_SYNTAX_ERROR),
1864 : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1865 :
1866 478 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1867 : ObjectIdGetDatum(relid),
1868 : ObjectIdGetDatum(pubid));
1869 478 : if (!OidIsValid(prid))
1870 : {
1871 12 : if (missing_ok)
1872 0 : continue;
1873 :
1874 12 : ereport(ERROR,
1875 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1876 : errmsg("relation \"%s\" is not part of the publication",
1877 : RelationGetRelationName(rel))));
1878 : }
1879 :
1880 466 : if (pubrel->whereClause)
1881 6 : ereport(ERROR,
1882 : (errcode(ERRCODE_SYNTAX_ERROR),
1883 : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1884 :
1885 460 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1886 460 : performDeletion(&obj, DROP_CASCADE, 0);
1887 : }
1888 484 : }
1889 :
1890 : /*
1891 : * Add listed schemas to the publication.
1892 : */
1893 : static void
1894 560 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1895 : AlterPublicationStmt *stmt)
1896 : {
1897 : ListCell *lc;
1898 :
1899 : Assert(!stmt || !stmt->for_all_tables);
1900 :
1901 770 : foreach(lc, schemas)
1902 : {
1903 222 : Oid schemaid = lfirst_oid(lc);
1904 : ObjectAddress obj;
1905 :
1906 222 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
1907 210 : if (stmt)
1908 : {
1909 58 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1910 : (Node *) stmt);
1911 :
1912 58 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1913 : obj.objectId, 0);
1914 : }
1915 : }
1916 548 : }
1917 :
1918 : /*
1919 : * Remove listed schemas from the publication.
1920 : */
1921 : static void
1922 442 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1923 : {
1924 : ObjectAddress obj;
1925 : ListCell *lc;
1926 : Oid psid;
1927 :
1928 492 : foreach(lc, schemas)
1929 : {
1930 56 : Oid schemaid = lfirst_oid(lc);
1931 :
1932 56 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1933 : Anum_pg_publication_namespace_oid,
1934 : ObjectIdGetDatum(schemaid),
1935 : ObjectIdGetDatum(pubid));
1936 56 : if (!OidIsValid(psid))
1937 : {
1938 6 : if (missing_ok)
1939 0 : continue;
1940 :
1941 6 : ereport(ERROR,
1942 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1943 : errmsg("tables from schema \"%s\" are not part of the publication",
1944 : get_namespace_name(schemaid))));
1945 : }
1946 :
1947 50 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1948 50 : performDeletion(&obj, DROP_CASCADE, 0);
1949 : }
1950 436 : }
1951 :
1952 : /*
1953 : * Internal workhorse for changing a publication owner
1954 : */
1955 : static void
1956 26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1957 : {
1958 : Form_pg_publication form;
1959 :
1960 26 : form = (Form_pg_publication) GETSTRUCT(tup);
1961 :
1962 26 : if (form->pubowner == newOwnerId)
1963 2 : return;
1964 :
1965 24 : if (!superuser())
1966 : {
1967 : AclResult aclresult;
1968 :
1969 : /* Must be owner */
1970 12 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1971 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1972 0 : NameStr(form->pubname));
1973 :
1974 : /* Must be able to become new owner */
1975 12 : check_can_set_role(GetUserId(), newOwnerId);
1976 :
1977 : /* New owner must have CREATE privilege on database */
1978 12 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
1979 12 : if (aclresult != ACLCHECK_OK)
1980 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
1981 0 : get_database_name(MyDatabaseId));
1982 :
1983 12 : if (form->puballtables && !superuser_arg(newOwnerId))
1984 0 : ereport(ERROR,
1985 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1986 : errmsg("permission denied to change owner of publication \"%s\"",
1987 : NameStr(form->pubname)),
1988 : errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1989 :
1990 12 : if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1991 6 : ereport(ERROR,
1992 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1993 : errmsg("permission denied to change owner of publication \"%s\"",
1994 : NameStr(form->pubname)),
1995 : errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1996 : }
1997 :
1998 18 : form->pubowner = newOwnerId;
1999 18 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2000 :
2001 : /* Update owner dependency reference */
2002 18 : changeDependencyOnOwner(PublicationRelationId,
2003 : form->oid,
2004 : newOwnerId);
2005 :
2006 18 : InvokeObjectPostAlterHook(PublicationRelationId,
2007 : form->oid, 0);
2008 : }
2009 :
2010 : /*
2011 : * Change publication owner -- by name
2012 : */
2013 : ObjectAddress
2014 26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
2015 : {
2016 : Oid subid;
2017 : HeapTuple tup;
2018 : Relation rel;
2019 : ObjectAddress address;
2020 : Form_pg_publication pubform;
2021 :
2022 26 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2023 :
2024 26 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2025 :
2026 26 : if (!HeapTupleIsValid(tup))
2027 0 : ereport(ERROR,
2028 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2029 : errmsg("publication \"%s\" does not exist", name)));
2030 :
2031 26 : pubform = (Form_pg_publication) GETSTRUCT(tup);
2032 26 : subid = pubform->oid;
2033 :
2034 26 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2035 :
2036 20 : ObjectAddressSet(address, PublicationRelationId, subid);
2037 :
2038 20 : heap_freetuple(tup);
2039 :
2040 20 : table_close(rel, RowExclusiveLock);
2041 :
2042 20 : return address;
2043 : }
2044 :
2045 : /*
2046 : * Change publication owner -- by OID
2047 : */
2048 : void
2049 0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
2050 : {
2051 : HeapTuple tup;
2052 : Relation rel;
2053 :
2054 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2055 :
2056 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
2057 :
2058 0 : if (!HeapTupleIsValid(tup))
2059 0 : ereport(ERROR,
2060 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2061 : errmsg("publication with OID %u does not exist", subid)));
2062 :
2063 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2064 :
2065 0 : heap_freetuple(tup);
2066 :
2067 0 : table_close(rel, RowExclusiveLock);
2068 0 : }
2069 :
2070 : /*
2071 : * Extract the publish_generated_columns option value from a DefElem. "stored"
2072 : * and "none" values are accepted.
2073 : */
2074 : static char
2075 70 : defGetGeneratedColsOption(DefElem *def)
2076 : {
2077 : char *sval;
2078 :
2079 : /*
2080 : * If no parameter value given, assume "stored" is meant.
2081 : */
2082 70 : if (!def->arg)
2083 6 : return PUBLISH_GENCOLS_STORED;
2084 :
2085 64 : sval = defGetString(def);
2086 :
2087 64 : if (pg_strcasecmp(sval, "none") == 0)
2088 22 : return PUBLISH_GENCOLS_NONE;
2089 42 : if (pg_strcasecmp(sval, "stored") == 0)
2090 36 : return PUBLISH_GENCOLS_STORED;
2091 :
2092 6 : ereport(ERROR,
2093 : errcode(ERRCODE_SYNTAX_ERROR),
2094 : errmsg("%s requires a \"none\" or \"stored\" value",
2095 : def->defname));
2096 :
2097 : return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2098 : }
|