Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/namespace.h"
23 : #include "catalog/objectaccess.h"
24 : #include "catalog/objectaddress.h"
25 : #include "catalog/pg_database.h"
26 : #include "catalog/pg_inherits.h"
27 : #include "catalog/pg_namespace.h"
28 : #include "catalog/pg_proc.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "commands/dbcommands.h"
33 : #include "commands/defrem.h"
34 : #include "commands/event_trigger.h"
35 : #include "commands/publicationcmds.h"
36 : #include "miscadmin.h"
37 : #include "nodes/nodeFuncs.h"
38 : #include "parser/parse_clause.h"
39 : #include "parser/parse_collate.h"
40 : #include "parser/parse_relation.h"
41 : #include "rewrite/rewriteHandler.h"
42 : #include "storage/lmgr.h"
43 : #include "utils/acl.h"
44 : #include "utils/builtins.h"
45 : #include "utils/inval.h"
46 : #include "utils/lsyscache.h"
47 : #include "utils/rel.h"
48 : #include "utils/syscache.h"
49 : #include "utils/varlena.h"
50 :
51 :
52 : /*
53 : * Information used to validate the columns in the row filter expression. See
54 : * contain_invalid_rfcolumn_walker for details.
55 : */
56 : typedef struct rf_context
57 : {
58 : Bitmapset *bms_replident; /* bitset of replica identity columns */
59 : bool pubviaroot; /* true if we are validating the parent
60 : * relation's row filter */
61 : Oid relid; /* relid of the relation */
62 : Oid parentid; /* relid of the parent relation */
63 : } rf_context;
64 :
65 : static List *OpenTableList(List *tables);
66 : static void CloseTableList(List *rels);
67 : static void LockSchemaList(List *schemalist);
68 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
69 : AlterPublicationStmt *stmt);
70 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
71 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
72 : AlterPublicationStmt *stmt);
73 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
74 : static char defGetGeneratedColsOption(DefElem *def);
75 :
76 :
77 : static void
78 908 : parse_publication_options(ParseState *pstate,
79 : List *options,
80 : bool *publish_given,
81 : PublicationActions *pubactions,
82 : bool *publish_via_partition_root_given,
83 : bool *publish_via_partition_root,
84 : bool *publish_generated_columns_given,
85 : char *publish_generated_columns)
86 : {
87 : ListCell *lc;
88 :
89 908 : *publish_given = false;
90 908 : *publish_via_partition_root_given = false;
91 908 : *publish_generated_columns_given = false;
92 :
93 : /* defaults */
94 908 : pubactions->pubinsert = true;
95 908 : pubactions->pubupdate = true;
96 908 : pubactions->pubdelete = true;
97 908 : pubactions->pubtruncate = true;
98 908 : *publish_via_partition_root = false;
99 908 : *publish_generated_columns = PUBLISH_GENCOLS_NONE;
100 :
101 : /* Parse options */
102 1246 : foreach(lc, options)
103 : {
104 368 : DefElem *defel = (DefElem *) lfirst(lc);
105 :
106 368 : if (strcmp(defel->defname, "publish") == 0)
107 : {
108 : char *publish;
109 : List *publish_list;
110 : ListCell *lc2;
111 :
112 122 : if (*publish_given)
113 0 : errorConflictingDefElem(defel, pstate);
114 :
115 : /*
116 : * If publish option was given only the explicitly listed actions
117 : * should be published.
118 : */
119 122 : pubactions->pubinsert = false;
120 122 : pubactions->pubupdate = false;
121 122 : pubactions->pubdelete = false;
122 122 : pubactions->pubtruncate = false;
123 :
124 122 : *publish_given = true;
125 122 : publish = defGetString(defel);
126 :
127 122 : if (!SplitIdentifierString(publish, ',', &publish_list))
128 0 : ereport(ERROR,
129 : (errcode(ERRCODE_SYNTAX_ERROR),
130 : errmsg("invalid list syntax in parameter \"%s\"",
131 : "publish")));
132 :
133 : /* Process the option list. */
134 270 : foreach(lc2, publish_list)
135 : {
136 154 : char *publish_opt = (char *) lfirst(lc2);
137 :
138 154 : if (strcmp(publish_opt, "insert") == 0)
139 108 : pubactions->pubinsert = true;
140 46 : else if (strcmp(publish_opt, "update") == 0)
141 20 : pubactions->pubupdate = true;
142 26 : else if (strcmp(publish_opt, "delete") == 0)
143 10 : pubactions->pubdelete = true;
144 16 : else if (strcmp(publish_opt, "truncate") == 0)
145 10 : pubactions->pubtruncate = true;
146 : else
147 6 : ereport(ERROR,
148 : (errcode(ERRCODE_SYNTAX_ERROR),
149 : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
150 : "publish", publish_opt)));
151 : }
152 : }
153 246 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
154 : {
155 164 : if (*publish_via_partition_root_given)
156 6 : errorConflictingDefElem(defel, pstate);
157 158 : *publish_via_partition_root_given = true;
158 158 : *publish_via_partition_root = defGetBoolean(defel);
159 : }
160 82 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
161 : {
162 76 : if (*publish_generated_columns_given)
163 6 : errorConflictingDefElem(defel, pstate);
164 70 : *publish_generated_columns_given = true;
165 70 : *publish_generated_columns = defGetGeneratedColsOption(defel);
166 : }
167 : else
168 6 : ereport(ERROR,
169 : (errcode(ERRCODE_SYNTAX_ERROR),
170 : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
171 : }
172 878 : }
173 :
174 : /*
175 : * Convert the PublicationObjSpecType list into schema oid list and
176 : * PublicationTable list.
177 : */
178 : static void
179 1618 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
180 : List **rels, List **schemas)
181 : {
182 : ListCell *cell;
183 : PublicationObjSpec *pubobj;
184 :
185 1618 : if (!pubobjspec_list)
186 88 : return;
187 :
188 3222 : foreach(cell, pubobjspec_list)
189 : {
190 : Oid schemaid;
191 : List *search_path;
192 :
193 1728 : pubobj = (PublicationObjSpec *) lfirst(cell);
194 :
195 1728 : switch (pubobj->pubobjtype)
196 : {
197 1372 : case PUBLICATIONOBJ_TABLE:
198 1372 : *rels = lappend(*rels, pubobj->pubtable);
199 1372 : break;
200 332 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
201 332 : schemaid = get_namespace_oid(pubobj->name, false);
202 :
203 : /* Filter out duplicates if user specifies "sch1, sch1" */
204 302 : *schemas = list_append_unique_oid(*schemas, schemaid);
205 302 : break;
206 24 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
207 24 : search_path = fetch_search_path(false);
208 24 : if (search_path == NIL) /* nothing valid in search_path? */
209 6 : ereport(ERROR,
210 : errcode(ERRCODE_UNDEFINED_SCHEMA),
211 : errmsg("no schema has been selected for CURRENT_SCHEMA"));
212 :
213 18 : schemaid = linitial_oid(search_path);
214 18 : list_free(search_path);
215 :
216 : /* Filter out duplicates if user specifies "sch1, sch1" */
217 18 : *schemas = list_append_unique_oid(*schemas, schemaid);
218 18 : break;
219 0 : default:
220 : /* shouldn't happen */
221 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
222 : break;
223 : }
224 : }
225 : }
226 :
227 : /*
228 : * Returns true if any of the columns used in the row filter WHERE expression is
229 : * not part of REPLICA IDENTITY, false otherwise.
230 : */
231 : static bool
232 258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
233 : {
234 258 : if (node == NULL)
235 0 : return false;
236 :
237 258 : if (IsA(node, Var))
238 : {
239 104 : Var *var = (Var *) node;
240 104 : AttrNumber attnum = var->varattno;
241 :
242 : /*
243 : * If pubviaroot is true, we are validating the row filter of the
244 : * parent table, but the bitmap contains the replica identity
245 : * information of the child table. So, get the column number of the
246 : * child table as parent and child column order could be different.
247 : */
248 104 : if (context->pubviaroot)
249 : {
250 16 : char *colname = get_attname(context->parentid, attnum, false);
251 :
252 16 : attnum = get_attnum(context->relid, colname);
253 : }
254 :
255 104 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
256 104 : context->bms_replident))
257 60 : return true;
258 : }
259 :
260 198 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
261 : context);
262 : }
263 :
264 : /*
265 : * Check if all columns referenced in the filter expression are part of the
266 : * REPLICA IDENTITY index or not.
267 : *
268 : * Returns true if any invalid column is found.
269 : */
270 : bool
271 712 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
272 : bool pubviaroot)
273 : {
274 : HeapTuple rftuple;
275 712 : Oid relid = RelationGetRelid(relation);
276 712 : Oid publish_as_relid = RelationGetRelid(relation);
277 712 : bool result = false;
278 : Datum rfdatum;
279 : bool rfisnull;
280 :
281 : /*
282 : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
283 : * allowed in the row filter and we can skip the validation.
284 : */
285 712 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
286 186 : return false;
287 :
288 : /*
289 : * For a partition, if pubviaroot is true, find the topmost ancestor that
290 : * is published via this publication as we need to use its row filter
291 : * expression to filter the partition's changes.
292 : *
293 : * Note that even though the row filter used is for an ancestor, the
294 : * REPLICA IDENTITY used will be for the actual child table.
295 : */
296 526 : if (pubviaroot && relation->rd_rel->relispartition)
297 : {
298 : publish_as_relid
299 108 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
300 :
301 108 : if (!OidIsValid(publish_as_relid))
302 6 : publish_as_relid = relid;
303 : }
304 :
305 526 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
306 : ObjectIdGetDatum(publish_as_relid),
307 : ObjectIdGetDatum(pubid));
308 :
309 526 : if (!HeapTupleIsValid(rftuple))
310 56 : return false;
311 :
312 470 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
313 : Anum_pg_publication_rel_prqual,
314 : &rfisnull);
315 :
316 470 : if (!rfisnull)
317 : {
318 102 : rf_context context = {0};
319 : Node *rfnode;
320 102 : Bitmapset *bms = NULL;
321 :
322 102 : context.pubviaroot = pubviaroot;
323 102 : context.parentid = publish_as_relid;
324 102 : context.relid = relid;
325 :
326 : /* Remember columns that are part of the REPLICA IDENTITY */
327 102 : bms = RelationGetIndexAttrBitmap(relation,
328 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
329 :
330 102 : context.bms_replident = bms;
331 102 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
332 102 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
333 : }
334 :
335 470 : ReleaseSysCache(rftuple);
336 :
337 470 : return result;
338 : }
339 :
340 : /*
341 : * Check for invalid columns in the publication table definition.
342 : *
343 : * This function evaluates two conditions:
344 : *
345 : * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
346 : * by the column list. If any column is missing, *invalid_column_list is set
347 : * to true.
348 : * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
349 : * are published, either by being explicitly named in the column list or, if
350 : * no column list is specified, by setting the option
351 : * publish_generated_columns to stored. If any unpublished
352 : * generated column is found, *invalid_gen_col is set to true.
353 : *
354 : * Returns true if any of the above conditions are not met.
355 : */
356 : bool
357 904 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
358 : bool pubviaroot, char pubgencols_type,
359 : bool *invalid_column_list,
360 : bool *invalid_gen_col)
361 : {
362 904 : Oid relid = RelationGetRelid(relation);
363 904 : Oid publish_as_relid = RelationGetRelid(relation);
364 : Bitmapset *idattrs;
365 904 : Bitmapset *columns = NULL;
366 904 : TupleDesc desc = RelationGetDescr(relation);
367 : Publication *pub;
368 : int x;
369 :
370 904 : *invalid_column_list = false;
371 904 : *invalid_gen_col = false;
372 :
373 : /*
374 : * For a partition, if pubviaroot is true, find the topmost ancestor that
375 : * is published via this publication as we need to use its column list for
376 : * the changes.
377 : *
378 : * Note that even though the column list used is for an ancestor, the
379 : * REPLICA IDENTITY used will be for the actual child table.
380 : */
381 904 : if (pubviaroot && relation->rd_rel->relispartition)
382 : {
383 160 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
384 :
385 160 : if (!OidIsValid(publish_as_relid))
386 36 : publish_as_relid = relid;
387 : }
388 :
389 : /* Fetch the column list */
390 904 : pub = GetPublication(pubid);
391 904 : check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
392 :
393 904 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
394 : {
395 : /* With REPLICA IDENTITY FULL, no column list is allowed. */
396 200 : *invalid_column_list = (columns != NULL);
397 :
398 : /*
399 : * As we don't allow a column list with REPLICA IDENTITY FULL, the
400 : * publish_generated_columns option must be set to stored if the table
401 : * has any stored generated columns.
402 : */
403 200 : if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
404 188 : relation->rd_att->constr &&
405 76 : relation->rd_att->constr->has_generated_stored)
406 6 : *invalid_gen_col = true;
407 :
408 : /*
409 : * Virtual generated columns are currently not supported for logical
410 : * replication at all.
411 : */
412 200 : if (relation->rd_att->constr &&
413 88 : relation->rd_att->constr->has_generated_virtual)
414 12 : *invalid_gen_col = true;
415 :
416 200 : if (*invalid_gen_col && *invalid_column_list)
417 0 : return true;
418 : }
419 :
420 : /* Remember columns that are part of the REPLICA IDENTITY */
421 904 : idattrs = RelationGetIndexAttrBitmap(relation,
422 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
423 :
424 : /*
425 : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
426 : * (to handle system columns the usual way), while column list does not
427 : * use offset, so we can't do bms_is_subset(). Instead, we have to loop
428 : * over the idattrs and check all of them are in the list.
429 : */
430 904 : x = -1;
431 1552 : while ((x = bms_next_member(idattrs, x)) >= 0)
432 : {
433 654 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
434 654 : Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
435 :
436 654 : if (columns == NULL)
437 : {
438 : /*
439 : * The publish_generated_columns option must be set to stored if
440 : * the REPLICA IDENTITY contains any stored generated column.
441 : */
442 444 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
443 : {
444 6 : *invalid_gen_col = true;
445 6 : break;
446 : }
447 :
448 : /*
449 : * The equivalent setting for virtual generated columns does not
450 : * exist yet.
451 : */
452 438 : if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
453 : {
454 0 : *invalid_gen_col = true;
455 0 : break;
456 : }
457 :
458 : /* Skip validating the column list since it is not defined */
459 438 : continue;
460 : }
461 :
462 : /*
463 : * If pubviaroot is true, we are validating the column list of the
464 : * parent table, but the bitmap contains the replica identity
465 : * information of the child table. The parent/child attnums may not
466 : * match, so translate them to the parent - get the attname from the
467 : * child, and look it up in the parent.
468 : */
469 210 : if (pubviaroot)
470 : {
471 : /* attribute name in the child table */
472 80 : char *colname = get_attname(relid, attnum, false);
473 :
474 : /*
475 : * Determine the attnum for the attribute name in parent (we are
476 : * using the column list defined on the parent).
477 : */
478 80 : attnum = get_attnum(publish_as_relid, colname);
479 : }
480 :
481 : /* replica identity column, not covered by the column list */
482 210 : *invalid_column_list |= !bms_is_member(attnum, columns);
483 :
484 210 : if (*invalid_column_list && *invalid_gen_col)
485 0 : break;
486 : }
487 :
488 904 : bms_free(columns);
489 904 : bms_free(idattrs);
490 :
491 904 : return *invalid_column_list || *invalid_gen_col;
492 : }
493 :
494 : /*
495 : * Invalidate entries in the RelationSyncCache for relations included in the
496 : * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
497 : *
498 : * If 'puballtables' is true, invalidate all cache entries.
499 : */
500 : void
501 36 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
502 : {
503 36 : if (puballtables)
504 : {
505 6 : CacheInvalidateRelSyncAll();
506 : }
507 : else
508 : {
509 30 : List *relids = NIL;
510 30 : List *schemarelids = NIL;
511 :
512 : /*
513 : * For partitioned tables, we must invalidate all partitions and
514 : * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
515 : * a target. However, WAL records for TRUNCATE specify both a root and
516 : * its leaves.
517 : */
518 30 : relids = GetPublicationRelations(pubid,
519 : PUBLICATION_PART_ALL);
520 30 : schemarelids = GetAllSchemaPublicationRelations(pubid,
521 : PUBLICATION_PART_ALL);
522 :
523 30 : relids = list_concat_unique_oid(relids, schemarelids);
524 :
525 : /* Invalidate the relsyncache */
526 66 : foreach_oid(relid, relids)
527 6 : CacheInvalidateRelSync(relid);
528 : }
529 :
530 36 : return;
531 : }
532 :
533 : /* check_functions_in_node callback */
534 : static bool
535 430 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
536 : {
537 430 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
538 : func_id >= FirstNormalObjectId);
539 : }
540 :
541 : /*
542 : * The row filter walker checks if the row filter expression is a "simple
543 : * expression".
544 : *
545 : * It allows only simple or compound expressions such as:
546 : * - (Var Op Const)
547 : * - (Var Op Var)
548 : * - (Var Op Const) AND/OR (Var Op Const)
549 : * - etc
550 : * (where Var is a column of the table this filter belongs to)
551 : *
552 : * The simple expression has the following restrictions:
553 : * - User-defined operators are not allowed;
554 : * - User-defined functions are not allowed;
555 : * - User-defined types are not allowed;
556 : * - User-defined collations are not allowed;
557 : * - Non-immutable built-in functions are not allowed;
558 : * - System columns are not allowed.
559 : *
560 : * NOTES
561 : *
562 : * We don't allow user-defined functions/operators/types/collations because
563 : * (a) if a user drops a user-defined object used in a row filter expression or
564 : * if there is any other error while using it, the logical decoding
565 : * infrastructure won't be able to recover from such an error even if the
566 : * object is recreated again because a historic snapshot is used to evaluate
567 : * the row filter;
568 : * (b) a user-defined function can be used to access tables that could have
569 : * unpleasant results because a historic snapshot is used. That's why only
570 : * immutable built-in functions are allowed in row filter expressions.
571 : *
572 : * We don't allow system columns because currently, we don't have that
573 : * information in the tuple passed to downstream. Also, as we don't replicate
574 : * those to subscribers, there doesn't seem to be a need for a filter on those
575 : * columns.
576 : *
577 : * We can allow other node types after more analysis and testing.
578 : */
579 : static bool
580 1392 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
581 : {
582 1392 : char *errdetail_msg = NULL;
583 :
584 1392 : if (node == NULL)
585 6 : return false;
586 :
587 1386 : switch (nodeTag(node))
588 : {
589 370 : case T_Var:
590 : /* System columns are not allowed. */
591 370 : if (((Var *) node)->varattno < InvalidAttrNumber)
592 6 : errdetail_msg = _("System columns are not allowed.");
593 370 : break;
594 380 : case T_OpExpr:
595 : case T_DistinctExpr:
596 : case T_NullIfExpr:
597 : /* OK, except user-defined operators are not allowed. */
598 380 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
599 6 : errdetail_msg = _("User-defined operators are not allowed.");
600 380 : break;
601 6 : case T_ScalarArrayOpExpr:
602 : /* OK, except user-defined operators are not allowed. */
603 6 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
604 0 : errdetail_msg = _("User-defined operators are not allowed.");
605 :
606 : /*
607 : * We don't need to check the hashfuncid and negfuncid of
608 : * ScalarArrayOpExpr as those functions are only built for a
609 : * subquery.
610 : */
611 6 : break;
612 6 : case T_RowCompareExpr:
613 : {
614 : ListCell *opid;
615 :
616 : /* OK, except user-defined operators are not allowed. */
617 18 : foreach(opid, ((RowCompareExpr *) node)->opnos)
618 : {
619 12 : if (lfirst_oid(opid) >= FirstNormalObjectId)
620 : {
621 0 : errdetail_msg = _("User-defined operators are not allowed.");
622 0 : break;
623 : }
624 : }
625 : }
626 6 : break;
627 618 : case T_Const:
628 : case T_FuncExpr:
629 : case T_BoolExpr:
630 : case T_RelabelType:
631 : case T_CollateExpr:
632 : case T_CaseExpr:
633 : case T_CaseTestExpr:
634 : case T_ArrayExpr:
635 : case T_RowExpr:
636 : case T_CoalesceExpr:
637 : case T_MinMaxExpr:
638 : case T_XmlExpr:
639 : case T_NullTest:
640 : case T_BooleanTest:
641 : case T_List:
642 : /* OK, supported */
643 618 : break;
644 6 : default:
645 6 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
646 6 : break;
647 : }
648 :
649 : /*
650 : * For all the supported nodes, if we haven't already found a problem,
651 : * check the types, functions, and collations used in it. We check List
652 : * by walking through each element.
653 : */
654 1386 : if (!errdetail_msg && !IsA(node, List))
655 : {
656 1314 : if (exprType(node) >= FirstNormalObjectId)
657 6 : errdetail_msg = _("User-defined types are not allowed.");
658 1308 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
659 : pstate))
660 18 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
661 2580 : else if (exprCollation(node) >= FirstNormalObjectId ||
662 1290 : exprInputCollation(node) >= FirstNormalObjectId)
663 6 : errdetail_msg = _("User-defined collations are not allowed.");
664 : }
665 :
666 : /*
667 : * If we found a problem in this node, throw error now. Otherwise keep
668 : * going.
669 : */
670 1386 : if (errdetail_msg)
671 48 : ereport(ERROR,
672 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
673 : errmsg("invalid publication WHERE expression"),
674 : errdetail_internal("%s", errdetail_msg),
675 : parser_errposition(pstate, exprLocation(node))));
676 :
677 1338 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
678 : pstate);
679 : }
680 :
681 : /*
682 : * Check if the row filter expression is a "simple expression".
683 : *
684 : * See check_simple_rowfilter_expr_walker for details.
685 : */
686 : static bool
687 358 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
688 : {
689 358 : return check_simple_rowfilter_expr_walker(node, pstate);
690 : }
691 :
692 : /*
693 : * Transform the publication WHERE expression for all the relations in the list,
694 : * ensuring it is coerced to boolean and necessary collation information is
695 : * added if required, and add a new nsitem/RTE for the associated relation to
696 : * the ParseState's namespace list.
697 : *
698 : * Also check the publication row filter expression and throw an error if
699 : * anything not permitted or unexpected is encountered.
700 : */
701 : static void
702 1150 : TransformPubWhereClauses(List *tables, const char *queryString,
703 : bool pubviaroot)
704 : {
705 : ListCell *lc;
706 :
707 2294 : foreach(lc, tables)
708 : {
709 : ParseNamespaceItem *nsitem;
710 1210 : Node *whereclause = NULL;
711 : ParseState *pstate;
712 1210 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
713 :
714 1210 : if (pri->whereClause == NULL)
715 834 : continue;
716 :
717 : /*
718 : * If the publication doesn't publish changes via the root partitioned
719 : * table, the partition's row filter will be used. So disallow using
720 : * WHERE clause on partitioned table in this case.
721 : */
722 376 : if (!pubviaroot &&
723 354 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
724 6 : ereport(ERROR,
725 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
726 : errmsg("cannot use publication WHERE clause for relation \"%s\"",
727 : RelationGetRelationName(pri->relation)),
728 : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
729 : "publish_via_partition_root")));
730 :
731 : /*
732 : * A fresh pstate is required so that we only have "this" table in its
733 : * rangetable
734 : */
735 370 : pstate = make_parsestate(NULL);
736 370 : pstate->p_sourcetext = queryString;
737 370 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
738 : AccessShareLock, NULL,
739 : false, false);
740 370 : addNSItemToQuery(pstate, nsitem, false, true, true);
741 :
742 370 : whereclause = transformWhereClause(pstate,
743 370 : copyObject(pri->whereClause),
744 : EXPR_KIND_WHERE,
745 : "PUBLICATION WHERE");
746 :
747 : /* Fix up collation information */
748 358 : assign_expr_collations(pstate, whereclause);
749 :
750 358 : whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
751 :
752 : /*
753 : * We allow only simple expressions in row filters. See
754 : * check_simple_rowfilter_expr_walker.
755 : */
756 358 : check_simple_rowfilter_expr(whereclause, pstate);
757 :
758 310 : free_parsestate(pstate);
759 :
760 310 : pri->whereClause = whereclause;
761 : }
762 1084 : }
763 :
764 :
765 : /*
766 : * Given a list of tables that are going to be added to a publication,
767 : * verify that they fulfill the necessary preconditions, namely: no tables
768 : * have a column list if any schema is published; and partitioned tables do
769 : * not have column lists if publish_via_partition_root is not set.
770 : *
771 : * 'publish_schema' indicates that the publication contains any TABLES IN
772 : * SCHEMA elements (newly added in this command, or preexisting).
773 : * 'pubviaroot' is the value of publish_via_partition_root.
774 : */
775 : static void
776 1084 : CheckPubRelationColumnList(char *pubname, List *tables,
777 : bool publish_schema, bool pubviaroot)
778 : {
779 : ListCell *lc;
780 :
781 2198 : foreach(lc, tables)
782 : {
783 1144 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
784 :
785 1144 : if (pri->columns == NIL)
786 764 : continue;
787 :
788 : /*
789 : * Disallow specifying column list if any schema is in the
790 : * publication.
791 : *
792 : * XXX We could instead just forbid the case when the publication
793 : * tries to publish the table with a column list and a schema for that
794 : * table. However, if we do that then we need a restriction during
795 : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
796 : * seem to be a good idea.
797 : */
798 380 : if (publish_schema)
799 24 : ereport(ERROR,
800 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
801 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
802 : get_namespace_name(RelationGetNamespace(pri->relation)),
803 : RelationGetRelationName(pri->relation), pubname),
804 : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
805 :
806 : /*
807 : * If the publication doesn't publish changes via the root partitioned
808 : * table, the partition's column list will be used. So disallow using
809 : * a column list on the partitioned table in this case.
810 : */
811 356 : if (!pubviaroot &&
812 280 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
813 6 : ereport(ERROR,
814 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
815 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
816 : get_namespace_name(RelationGetNamespace(pri->relation)),
817 : RelationGetRelationName(pri->relation), pubname),
818 : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
819 : "publish_via_partition_root")));
820 : }
821 1054 : }
822 :
823 : /*
824 : * Create new publication.
825 : */
826 : ObjectAddress
827 804 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
828 : {
829 : Relation rel;
830 : ObjectAddress myself;
831 : Oid puboid;
832 : bool nulls[Natts_pg_publication];
833 : Datum values[Natts_pg_publication];
834 : HeapTuple tup;
835 : bool publish_given;
836 : PublicationActions pubactions;
837 : bool publish_via_partition_root_given;
838 : bool publish_via_partition_root;
839 : bool publish_generated_columns_given;
840 : char publish_generated_columns;
841 : AclResult aclresult;
842 804 : List *relations = NIL;
843 804 : List *schemaidlist = NIL;
844 :
845 : /* must have CREATE privilege on database */
846 804 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
847 804 : if (aclresult != ACLCHECK_OK)
848 6 : aclcheck_error(aclresult, OBJECT_DATABASE,
849 6 : get_database_name(MyDatabaseId));
850 :
851 : /* FOR ALL TABLES requires superuser */
852 798 : if (stmt->for_all_tables && !superuser())
853 0 : ereport(ERROR,
854 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
855 : errmsg("must be superuser to create FOR ALL TABLES publication")));
856 :
857 798 : rel = table_open(PublicationRelationId, RowExclusiveLock);
858 :
859 : /* Check if name is used */
860 798 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
861 : CStringGetDatum(stmt->pubname));
862 798 : if (OidIsValid(puboid))
863 6 : ereport(ERROR,
864 : (errcode(ERRCODE_DUPLICATE_OBJECT),
865 : errmsg("publication \"%s\" already exists",
866 : stmt->pubname)));
867 :
868 : /* Form a tuple. */
869 792 : memset(values, 0, sizeof(values));
870 792 : memset(nulls, false, sizeof(nulls));
871 :
872 792 : values[Anum_pg_publication_pubname - 1] =
873 792 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
874 792 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
875 :
876 792 : parse_publication_options(pstate,
877 : stmt->options,
878 : &publish_given, &pubactions,
879 : &publish_via_partition_root_given,
880 : &publish_via_partition_root,
881 : &publish_generated_columns_given,
882 : &publish_generated_columns);
883 :
884 762 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
885 : Anum_pg_publication_oid);
886 762 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
887 762 : values[Anum_pg_publication_puballtables - 1] =
888 762 : BoolGetDatum(stmt->for_all_tables);
889 762 : values[Anum_pg_publication_pubinsert - 1] =
890 762 : BoolGetDatum(pubactions.pubinsert);
891 762 : values[Anum_pg_publication_pubupdate - 1] =
892 762 : BoolGetDatum(pubactions.pubupdate);
893 762 : values[Anum_pg_publication_pubdelete - 1] =
894 762 : BoolGetDatum(pubactions.pubdelete);
895 762 : values[Anum_pg_publication_pubtruncate - 1] =
896 762 : BoolGetDatum(pubactions.pubtruncate);
897 762 : values[Anum_pg_publication_pubviaroot - 1] =
898 762 : BoolGetDatum(publish_via_partition_root);
899 762 : values[Anum_pg_publication_pubgencols - 1] =
900 762 : CharGetDatum(publish_generated_columns);
901 :
902 762 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
903 :
904 : /* Insert tuple into catalog. */
905 762 : CatalogTupleInsert(rel, tup);
906 762 : heap_freetuple(tup);
907 :
908 762 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
909 :
910 762 : ObjectAddressSet(myself, PublicationRelationId, puboid);
911 :
912 : /* Make the changes visible. */
913 762 : CommandCounterIncrement();
914 :
915 : /* Associate objects with the publication. */
916 762 : if (stmt->for_all_tables)
917 : {
918 : /* Invalidate relcache so that publication info is rebuilt. */
919 94 : CacheInvalidateRelcacheAll();
920 : }
921 : else
922 : {
923 668 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
924 : &schemaidlist);
925 :
926 : /* FOR TABLES IN SCHEMA requires superuser */
927 650 : if (schemaidlist != NIL && !superuser())
928 6 : ereport(ERROR,
929 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
930 : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
931 :
932 644 : if (relations != NIL)
933 : {
934 : List *rels;
935 :
936 448 : rels = OpenTableList(relations);
937 424 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
938 : publish_via_partition_root);
939 :
940 394 : CheckPubRelationColumnList(stmt->pubname, rels,
941 : schemaidlist != NIL,
942 : publish_via_partition_root);
943 :
944 388 : PublicationAddTables(puboid, rels, true, NULL);
945 362 : CloseTableList(rels);
946 : }
947 :
948 558 : if (schemaidlist != NIL)
949 : {
950 : /*
951 : * Schema lock is held until the publication is created to prevent
952 : * concurrent schema deletion.
953 : */
954 134 : LockSchemaList(schemaidlist);
955 134 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
956 : }
957 : }
958 :
959 646 : table_close(rel, RowExclusiveLock);
960 :
961 646 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
962 :
963 646 : if (wal_level != WAL_LEVEL_LOGICAL)
964 368 : ereport(WARNING,
965 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
966 : errmsg("\"wal_level\" is insufficient to publish logical changes"),
967 : errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
968 :
969 646 : return myself;
970 : }
971 :
972 : /*
973 : * Change options of a publication.
974 : */
975 : static void
976 116 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
977 : Relation rel, HeapTuple tup)
978 : {
979 : bool nulls[Natts_pg_publication];
980 : bool replaces[Natts_pg_publication];
981 : Datum values[Natts_pg_publication];
982 : bool publish_given;
983 : PublicationActions pubactions;
984 : bool publish_via_partition_root_given;
985 : bool publish_via_partition_root;
986 : bool publish_generated_columns_given;
987 : char publish_generated_columns;
988 : ObjectAddress obj;
989 : Form_pg_publication pubform;
990 116 : List *root_relids = NIL;
991 : ListCell *lc;
992 :
993 116 : parse_publication_options(pstate,
994 : stmt->options,
995 : &publish_given, &pubactions,
996 : &publish_via_partition_root_given,
997 : &publish_via_partition_root,
998 : &publish_generated_columns_given,
999 : &publish_generated_columns);
1000 :
1001 116 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1002 :
1003 : /*
1004 : * If the publication doesn't publish changes via the root partitioned
1005 : * table, the partition's row filter and column list will be used. So
1006 : * disallow using WHERE clause and column lists on partitioned table in
1007 : * this case.
1008 : */
1009 116 : if (!pubform->puballtables && publish_via_partition_root_given &&
1010 80 : !publish_via_partition_root)
1011 : {
1012 : /*
1013 : * Lock the publication so nobody else can do anything with it. This
1014 : * prevents concurrent alter to add partitioned table(s) with WHERE
1015 : * clause(s) and/or column lists which we don't allow when not
1016 : * publishing via root.
1017 : */
1018 48 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1019 : AccessShareLock);
1020 :
1021 48 : root_relids = GetPublicationRelations(pubform->oid,
1022 : PUBLICATION_PART_ROOT);
1023 :
1024 84 : foreach(lc, root_relids)
1025 : {
1026 48 : Oid relid = lfirst_oid(lc);
1027 : HeapTuple rftuple;
1028 : char relkind;
1029 : char *relname;
1030 : bool has_rowfilter;
1031 : bool has_collist;
1032 :
1033 : /*
1034 : * Beware: we don't have lock on the relations, so cope silently
1035 : * with the cache lookups returning NULL.
1036 : */
1037 :
1038 48 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1039 : ObjectIdGetDatum(relid),
1040 : ObjectIdGetDatum(pubform->oid));
1041 48 : if (!HeapTupleIsValid(rftuple))
1042 0 : continue;
1043 48 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1044 48 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1045 48 : if (!has_rowfilter && !has_collist)
1046 : {
1047 12 : ReleaseSysCache(rftuple);
1048 12 : continue;
1049 : }
1050 :
1051 36 : relkind = get_rel_relkind(relid);
1052 36 : if (relkind != RELKIND_PARTITIONED_TABLE)
1053 : {
1054 24 : ReleaseSysCache(rftuple);
1055 24 : continue;
1056 : }
1057 12 : relname = get_rel_name(relid);
1058 12 : if (relname == NULL) /* table concurrently dropped */
1059 : {
1060 0 : ReleaseSysCache(rftuple);
1061 0 : continue;
1062 : }
1063 :
1064 12 : if (has_rowfilter)
1065 6 : ereport(ERROR,
1066 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1067 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1068 : "publish_via_partition_root",
1069 : stmt->pubname),
1070 : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1071 : relname, "publish_via_partition_root")));
1072 : Assert(has_collist);
1073 6 : ereport(ERROR,
1074 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1075 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1076 : "publish_via_partition_root",
1077 : stmt->pubname),
1078 : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1079 : relname, "publish_via_partition_root")));
1080 : }
1081 : }
1082 :
1083 : /* Everything ok, form a new tuple. */
1084 104 : memset(values, 0, sizeof(values));
1085 104 : memset(nulls, false, sizeof(nulls));
1086 104 : memset(replaces, false, sizeof(replaces));
1087 :
1088 104 : if (publish_given)
1089 : {
1090 28 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1091 28 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1092 :
1093 28 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1094 28 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1095 :
1096 28 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1097 28 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1098 :
1099 28 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1100 28 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1101 : }
1102 :
1103 104 : if (publish_via_partition_root_given)
1104 : {
1105 70 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1106 70 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1107 : }
1108 :
1109 104 : if (publish_generated_columns_given)
1110 : {
1111 6 : values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1112 6 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1113 : }
1114 :
1115 104 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1116 : replaces);
1117 :
1118 : /* Update the catalog. */
1119 104 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1120 :
1121 104 : CommandCounterIncrement();
1122 :
1123 104 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1124 :
1125 : /* Invalidate the relcache. */
1126 104 : if (pubform->puballtables)
1127 : {
1128 8 : CacheInvalidateRelcacheAll();
1129 : }
1130 : else
1131 : {
1132 96 : List *relids = NIL;
1133 96 : List *schemarelids = NIL;
1134 :
1135 : /*
1136 : * For any partitioned tables contained in the publication, we must
1137 : * invalidate all partitions contained in the respective partition
1138 : * trees, not just those explicitly mentioned in the publication.
1139 : */
1140 96 : if (root_relids == NIL)
1141 60 : relids = GetPublicationRelations(pubform->oid,
1142 : PUBLICATION_PART_ALL);
1143 : else
1144 : {
1145 : /*
1146 : * We already got tables explicitly mentioned in the publication.
1147 : * Now get all partitions for the partitioned table in the list.
1148 : */
1149 72 : foreach(lc, root_relids)
1150 36 : relids = GetPubPartitionOptionRelations(relids,
1151 : PUBLICATION_PART_ALL,
1152 : lfirst_oid(lc));
1153 : }
1154 :
1155 96 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1156 : PUBLICATION_PART_ALL);
1157 96 : relids = list_concat_unique_oid(relids, schemarelids);
1158 :
1159 96 : InvalidatePublicationRels(relids);
1160 : }
1161 :
1162 104 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1163 104 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1164 : (Node *) stmt);
1165 :
1166 104 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1167 104 : }
1168 :
1169 : /*
1170 : * Invalidate the relations.
1171 : */
1172 : void
1173 2344 : InvalidatePublicationRels(List *relids)
1174 : {
1175 : /*
1176 : * We don't want to send too many individual messages, at some point it's
1177 : * cheaper to just reset whole relcache.
1178 : */
1179 2344 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1180 : {
1181 : ListCell *lc;
1182 :
1183 19568 : foreach(lc, relids)
1184 17224 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1185 : }
1186 : else
1187 0 : CacheInvalidateRelcacheAll();
1188 2344 : }
1189 :
1190 : /*
1191 : * Add or remove table to/from publication.
1192 : */
1193 : static void
1194 890 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1195 : List *tables, const char *queryString,
1196 : bool publish_schema)
1197 : {
1198 890 : List *rels = NIL;
1199 890 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1200 890 : Oid pubid = pubform->oid;
1201 :
1202 : /*
1203 : * Nothing to do if no objects, except in SET: for that it is quite
1204 : * possible that user has not specified any tables in which case we need
1205 : * to remove all the existing tables.
1206 : */
1207 890 : if (!tables && stmt->action != AP_SetObjects)
1208 66 : return;
1209 :
1210 824 : rels = OpenTableList(tables);
1211 :
1212 824 : if (stmt->action == AP_AddObjects)
1213 : {
1214 280 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1215 :
1216 262 : publish_schema |= is_schema_publication(pubid);
1217 :
1218 262 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1219 262 : pubform->pubviaroot);
1220 :
1221 250 : PublicationAddTables(pubid, rels, false, stmt);
1222 : }
1223 544 : else if (stmt->action == AP_DropObjects)
1224 98 : PublicationDropTables(pubid, rels, false);
1225 : else /* AP_SetObjects */
1226 : {
1227 446 : List *oldrelids = GetPublicationRelations(pubid,
1228 : PUBLICATION_PART_ROOT);
1229 446 : List *delrels = NIL;
1230 : ListCell *oldlc;
1231 :
1232 446 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1233 :
1234 428 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1235 428 : pubform->pubviaroot);
1236 :
1237 : /*
1238 : * To recreate the relation list for the publication, look for
1239 : * existing relations that do not need to be dropped.
1240 : */
1241 806 : foreach(oldlc, oldrelids)
1242 : {
1243 402 : Oid oldrelid = lfirst_oid(oldlc);
1244 : ListCell *newlc;
1245 : PublicationRelInfo *oldrel;
1246 402 : bool found = false;
1247 : HeapTuple rftuple;
1248 402 : Node *oldrelwhereclause = NULL;
1249 402 : Bitmapset *oldcolumns = NULL;
1250 :
1251 : /* look up the cache for the old relmap */
1252 402 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1253 : ObjectIdGetDatum(oldrelid),
1254 : ObjectIdGetDatum(pubid));
1255 :
1256 : /*
1257 : * See if the existing relation currently has a WHERE clause or a
1258 : * column list. We need to compare those too.
1259 : */
1260 402 : if (HeapTupleIsValid(rftuple))
1261 : {
1262 402 : bool isnull = true;
1263 : Datum whereClauseDatum;
1264 : Datum columnListDatum;
1265 :
1266 : /* Load the WHERE clause for this table. */
1267 402 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1268 : Anum_pg_publication_rel_prqual,
1269 : &isnull);
1270 402 : if (!isnull)
1271 210 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1272 :
1273 : /* Transform the int2vector column list to a bitmap. */
1274 402 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1275 : Anum_pg_publication_rel_prattrs,
1276 : &isnull);
1277 :
1278 402 : if (!isnull)
1279 136 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1280 :
1281 402 : ReleaseSysCache(rftuple);
1282 : }
1283 :
1284 778 : foreach(newlc, rels)
1285 : {
1286 : PublicationRelInfo *newpubrel;
1287 : Oid newrelid;
1288 404 : Bitmapset *newcolumns = NULL;
1289 :
1290 404 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1291 404 : newrelid = RelationGetRelid(newpubrel->relation);
1292 :
1293 : /*
1294 : * Validate the column list. If the column list or WHERE
1295 : * clause changes, then the validation done here will be
1296 : * duplicated inside PublicationAddTables(). The validation
1297 : * is cheap enough that that seems harmless.
1298 : */
1299 404 : newcolumns = pub_collist_validate(newpubrel->relation,
1300 : newpubrel->columns);
1301 :
1302 : /*
1303 : * Check if any of the new set of relations matches with the
1304 : * existing relations in the publication. Additionally, if the
1305 : * relation has an associated WHERE clause, check the WHERE
1306 : * expressions also match. Same for the column list. Drop the
1307 : * rest.
1308 : */
1309 392 : if (newrelid == oldrelid)
1310 : {
1311 284 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1312 80 : bms_equal(oldcolumns, newcolumns))
1313 : {
1314 16 : found = true;
1315 16 : break;
1316 : }
1317 : }
1318 : }
1319 :
1320 : /*
1321 : * Add the non-matched relations to a list so that they can be
1322 : * dropped.
1323 : */
1324 390 : if (!found)
1325 : {
1326 374 : oldrel = palloc(sizeof(PublicationRelInfo));
1327 374 : oldrel->whereClause = NULL;
1328 374 : oldrel->columns = NIL;
1329 374 : oldrel->relation = table_open(oldrelid,
1330 : ShareUpdateExclusiveLock);
1331 374 : delrels = lappend(delrels, oldrel);
1332 : }
1333 : }
1334 :
1335 : /* And drop them. */
1336 404 : PublicationDropTables(pubid, delrels, true);
1337 :
1338 : /*
1339 : * Don't bother calculating the difference for adding, we'll catch and
1340 : * skip existing ones when doing catalog update.
1341 : */
1342 404 : PublicationAddTables(pubid, rels, true, stmt);
1343 :
1344 404 : CloseTableList(delrels);
1345 : }
1346 :
1347 692 : CloseTableList(rels);
1348 : }
1349 :
1350 : /*
1351 : * Alter the publication schemas.
1352 : *
1353 : * Add or remove schemas to/from publication.
1354 : */
1355 : static void
1356 758 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1357 : HeapTuple tup, List *schemaidlist)
1358 : {
1359 758 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1360 :
1361 : /*
1362 : * Nothing to do if no objects, except in SET: for that it is quite
1363 : * possible that user has not specified any schemas in which case we need
1364 : * to remove all the existing schemas.
1365 : */
1366 758 : if (!schemaidlist && stmt->action != AP_SetObjects)
1367 288 : return;
1368 :
1369 : /*
1370 : * Schema lock is held until the publication is altered to prevent
1371 : * concurrent schema deletion.
1372 : */
1373 470 : LockSchemaList(schemaidlist);
1374 470 : if (stmt->action == AP_AddObjects)
1375 : {
1376 : ListCell *lc;
1377 : List *reloids;
1378 :
1379 28 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1380 :
1381 38 : foreach(lc, reloids)
1382 : {
1383 : HeapTuple coltuple;
1384 :
1385 16 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1386 : ObjectIdGetDatum(lfirst_oid(lc)),
1387 : ObjectIdGetDatum(pubform->oid));
1388 :
1389 16 : if (!HeapTupleIsValid(coltuple))
1390 0 : continue;
1391 :
1392 : /*
1393 : * Disallow adding schema if column list is already part of the
1394 : * publication. See CheckPubRelationColumnList.
1395 : */
1396 16 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1397 6 : ereport(ERROR,
1398 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1399 : errmsg("cannot add schema to publication \"%s\"",
1400 : stmt->pubname),
1401 : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1402 :
1403 10 : ReleaseSysCache(coltuple);
1404 : }
1405 :
1406 22 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1407 : }
1408 442 : else if (stmt->action == AP_DropObjects)
1409 38 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1410 : else /* AP_SetObjects */
1411 : {
1412 404 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1413 404 : List *delschemas = NIL;
1414 :
1415 : /* Identify which schemas should be dropped */
1416 404 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1417 :
1418 : /*
1419 : * Schema lock is held until the publication is altered to prevent
1420 : * concurrent schema deletion.
1421 : */
1422 404 : LockSchemaList(delschemas);
1423 :
1424 : /* And drop them */
1425 404 : PublicationDropSchemas(pubform->oid, delschemas, true);
1426 :
1427 : /*
1428 : * Don't bother calculating the difference for adding, we'll catch and
1429 : * skip existing ones when doing catalog update.
1430 : */
1431 404 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1432 : }
1433 : }
1434 :
1435 : /*
1436 : * Check if relations and schemas can be in a given publication and throw
1437 : * appropriate error if not.
1438 : */
1439 : static void
1440 932 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1441 : List *tables, List *schemaidlist)
1442 : {
1443 932 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1444 :
1445 932 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1446 94 : schemaidlist && !superuser())
1447 6 : ereport(ERROR,
1448 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1449 : errmsg("must be superuser to add or set schemas")));
1450 :
1451 : /*
1452 : * Check that user is allowed to manipulate the publication tables in
1453 : * schema
1454 : */
1455 926 : if (schemaidlist && pubform->puballtables)
1456 18 : ereport(ERROR,
1457 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1458 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1459 : NameStr(pubform->pubname)),
1460 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1461 :
1462 : /* Check that user is allowed to manipulate the publication tables. */
1463 908 : if (tables && pubform->puballtables)
1464 18 : ereport(ERROR,
1465 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1466 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1467 : NameStr(pubform->pubname)),
1468 : errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1469 890 : }
1470 :
1471 : /*
1472 : * Alter the existing publication.
1473 : *
1474 : * This is dispatcher function for AlterPublicationOptions,
1475 : * AlterPublicationSchemas and AlterPublicationTables.
1476 : */
1477 : void
1478 1066 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1479 : {
1480 : Relation rel;
1481 : HeapTuple tup;
1482 : Form_pg_publication pubform;
1483 :
1484 1066 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1485 :
1486 1066 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1487 : CStringGetDatum(stmt->pubname));
1488 :
1489 1066 : if (!HeapTupleIsValid(tup))
1490 0 : ereport(ERROR,
1491 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1492 : errmsg("publication \"%s\" does not exist",
1493 : stmt->pubname)));
1494 :
1495 1066 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1496 :
1497 : /* must be owner */
1498 1066 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1499 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1500 0 : stmt->pubname);
1501 :
1502 1066 : if (stmt->options)
1503 116 : AlterPublicationOptions(pstate, stmt, rel, tup);
1504 : else
1505 : {
1506 950 : List *relations = NIL;
1507 950 : List *schemaidlist = NIL;
1508 950 : Oid pubid = pubform->oid;
1509 :
1510 950 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1511 : &schemaidlist);
1512 :
1513 932 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1514 :
1515 890 : heap_freetuple(tup);
1516 :
1517 : /* Lock the publication so nobody else can do anything with it. */
1518 890 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1519 : AccessExclusiveLock);
1520 :
1521 : /*
1522 : * It is possible that by the time we acquire the lock on publication,
1523 : * concurrent DDL has removed it. We can test this by checking the
1524 : * existence of publication. We get the tuple again to avoid the risk
1525 : * of any publication option getting changed.
1526 : */
1527 890 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1528 890 : if (!HeapTupleIsValid(tup))
1529 0 : ereport(ERROR,
1530 : errcode(ERRCODE_UNDEFINED_OBJECT),
1531 : errmsg("publication \"%s\" does not exist",
1532 : stmt->pubname));
1533 :
1534 890 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1535 : schemaidlist != NIL);
1536 758 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1537 : }
1538 :
1539 : /* Cleanup. */
1540 844 : heap_freetuple(tup);
1541 844 : table_close(rel, RowExclusiveLock);
1542 844 : }
1543 :
1544 : /*
1545 : * Remove relation from publication by mapping OID.
1546 : */
1547 : void
1548 840 : RemovePublicationRelById(Oid proid)
1549 : {
1550 : Relation rel;
1551 : HeapTuple tup;
1552 : Form_pg_publication_rel pubrel;
1553 840 : List *relids = NIL;
1554 :
1555 840 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1556 :
1557 840 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1558 :
1559 840 : if (!HeapTupleIsValid(tup))
1560 0 : elog(ERROR, "cache lookup failed for publication table %u",
1561 : proid);
1562 :
1563 840 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1564 :
1565 : /*
1566 : * Invalidate relcache so that publication info is rebuilt.
1567 : *
1568 : * For the partitioned tables, we must invalidate all partitions contained
1569 : * in the respective partition hierarchies, not just the one explicitly
1570 : * mentioned in the publication. This is required because we implicitly
1571 : * publish the child tables when the parent table is published.
1572 : */
1573 840 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1574 : pubrel->prrelid);
1575 :
1576 840 : InvalidatePublicationRels(relids);
1577 :
1578 840 : CatalogTupleDelete(rel, &tup->t_self);
1579 :
1580 840 : ReleaseSysCache(tup);
1581 :
1582 840 : table_close(rel, RowExclusiveLock);
1583 840 : }
1584 :
1585 : /*
1586 : * Remove the publication by mapping OID.
1587 : */
1588 : void
1589 470 : RemovePublicationById(Oid pubid)
1590 : {
1591 : Relation rel;
1592 : HeapTuple tup;
1593 : Form_pg_publication pubform;
1594 :
1595 470 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1596 :
1597 470 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1598 470 : if (!HeapTupleIsValid(tup))
1599 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1600 :
1601 470 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1602 :
1603 : /* Invalidate relcache so that publication info is rebuilt. */
1604 470 : if (pubform->puballtables)
1605 60 : CacheInvalidateRelcacheAll();
1606 :
1607 470 : CatalogTupleDelete(rel, &tup->t_self);
1608 :
1609 470 : ReleaseSysCache(tup);
1610 :
1611 470 : table_close(rel, RowExclusiveLock);
1612 470 : }
1613 :
1614 : /*
1615 : * Remove schema from publication by mapping OID.
1616 : */
1617 : void
1618 192 : RemovePublicationSchemaById(Oid psoid)
1619 : {
1620 : Relation rel;
1621 : HeapTuple tup;
1622 192 : List *schemaRels = NIL;
1623 : Form_pg_publication_namespace pubsch;
1624 :
1625 192 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1626 :
1627 192 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1628 :
1629 192 : if (!HeapTupleIsValid(tup))
1630 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1631 :
1632 192 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1633 :
1634 : /*
1635 : * Invalidate relcache so that publication info is rebuilt. See
1636 : * RemovePublicationRelById for why we need to consider all the
1637 : * partitions.
1638 : */
1639 192 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1640 : PUBLICATION_PART_ALL);
1641 192 : InvalidatePublicationRels(schemaRels);
1642 :
1643 192 : CatalogTupleDelete(rel, &tup->t_self);
1644 :
1645 192 : ReleaseSysCache(tup);
1646 :
1647 192 : table_close(rel, RowExclusiveLock);
1648 192 : }
1649 :
1650 : /*
1651 : * Open relations specified by a PublicationTable list.
1652 : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1653 : * add them to a publication.
1654 : */
1655 : static List *
1656 1272 : OpenTableList(List *tables)
1657 : {
1658 1272 : List *relids = NIL;
1659 1272 : List *rels = NIL;
1660 : ListCell *lc;
1661 1272 : List *relids_with_rf = NIL;
1662 1272 : List *relids_with_collist = NIL;
1663 :
1664 : /*
1665 : * Open, share-lock, and check all the explicitly-specified relations
1666 : */
1667 2602 : foreach(lc, tables)
1668 : {
1669 1354 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1670 1354 : bool recurse = t->relation->inh;
1671 : Relation rel;
1672 : Oid myrelid;
1673 : PublicationRelInfo *pub_rel;
1674 :
1675 : /* Allow query cancel in case this takes a long time */
1676 1354 : CHECK_FOR_INTERRUPTS();
1677 :
1678 1354 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1679 1354 : myrelid = RelationGetRelid(rel);
1680 :
1681 : /*
1682 : * Filter out duplicates if user specifies "foo, foo".
1683 : *
1684 : * Note that this algorithm is known to not be very efficient (O(N^2))
1685 : * but given that it only works on list of tables given to us by user
1686 : * it's deemed acceptable.
1687 : */
1688 1354 : if (list_member_oid(relids, myrelid))
1689 : {
1690 : /* Disallow duplicate tables if there are any with row filters. */
1691 24 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1692 12 : ereport(ERROR,
1693 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1694 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1695 : RelationGetRelationName(rel))));
1696 :
1697 : /* Disallow duplicate tables if there are any with column lists. */
1698 12 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1699 12 : ereport(ERROR,
1700 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1701 : errmsg("conflicting or redundant column lists for table \"%s\"",
1702 : RelationGetRelationName(rel))));
1703 :
1704 0 : table_close(rel, ShareUpdateExclusiveLock);
1705 0 : continue;
1706 : }
1707 :
1708 1330 : pub_rel = palloc(sizeof(PublicationRelInfo));
1709 1330 : pub_rel->relation = rel;
1710 1330 : pub_rel->whereClause = t->whereClause;
1711 1330 : pub_rel->columns = t->columns;
1712 1330 : rels = lappend(rels, pub_rel);
1713 1330 : relids = lappend_oid(relids, myrelid);
1714 :
1715 1330 : if (t->whereClause)
1716 386 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1717 :
1718 1330 : if (t->columns)
1719 386 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1720 :
1721 : /*
1722 : * Add children of this rel, if requested, so that they too are added
1723 : * to the publication. A partitioned table can't have any inheritance
1724 : * children other than its partitions, which need not be explicitly
1725 : * added to the publication.
1726 : */
1727 1330 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1728 : {
1729 : List *children;
1730 : ListCell *child;
1731 :
1732 1146 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1733 : NULL);
1734 :
1735 2300 : foreach(child, children)
1736 : {
1737 1154 : Oid childrelid = lfirst_oid(child);
1738 :
1739 : /* Allow query cancel in case this takes a long time */
1740 1154 : CHECK_FOR_INTERRUPTS();
1741 :
1742 : /*
1743 : * Skip duplicates if user specified both parent and child
1744 : * tables.
1745 : */
1746 1154 : if (list_member_oid(relids, childrelid))
1747 : {
1748 : /*
1749 : * We don't allow to specify row filter for both parent
1750 : * and child table at the same time as it is not very
1751 : * clear which one should be given preference.
1752 : */
1753 1146 : if (childrelid != myrelid &&
1754 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1755 0 : ereport(ERROR,
1756 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1757 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1758 : RelationGetRelationName(rel))));
1759 :
1760 : /*
1761 : * We don't allow to specify column list for both parent
1762 : * and child table at the same time as it is not very
1763 : * clear which one should be given preference.
1764 : */
1765 1146 : if (childrelid != myrelid &&
1766 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1767 0 : ereport(ERROR,
1768 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1769 : errmsg("conflicting or redundant column lists for table \"%s\"",
1770 : RelationGetRelationName(rel))));
1771 :
1772 1146 : continue;
1773 : }
1774 :
1775 : /* find_all_inheritors already got lock */
1776 8 : rel = table_open(childrelid, NoLock);
1777 8 : pub_rel = palloc(sizeof(PublicationRelInfo));
1778 8 : pub_rel->relation = rel;
1779 : /* child inherits WHERE clause from parent */
1780 8 : pub_rel->whereClause = t->whereClause;
1781 :
1782 : /* child inherits column list from parent */
1783 8 : pub_rel->columns = t->columns;
1784 8 : rels = lappend(rels, pub_rel);
1785 8 : relids = lappend_oid(relids, childrelid);
1786 :
1787 8 : if (t->whereClause)
1788 2 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1789 :
1790 8 : if (t->columns)
1791 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1792 : }
1793 : }
1794 : }
1795 :
1796 1248 : list_free(relids);
1797 1248 : list_free(relids_with_rf);
1798 :
1799 1248 : return rels;
1800 : }
1801 :
1802 : /*
1803 : * Close all relations in the list.
1804 : */
1805 : static void
1806 1458 : CloseTableList(List *rels)
1807 : {
1808 : ListCell *lc;
1809 :
1810 2952 : foreach(lc, rels)
1811 : {
1812 : PublicationRelInfo *pub_rel;
1813 :
1814 1494 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1815 1494 : table_close(pub_rel->relation, NoLock);
1816 : }
1817 :
1818 1458 : list_free_deep(rels);
1819 1458 : }
1820 :
1821 : /*
1822 : * Lock the schemas specified in the schema list in AccessShareLock mode in
1823 : * order to prevent concurrent schema deletion.
1824 : */
1825 : static void
1826 1008 : LockSchemaList(List *schemalist)
1827 : {
1828 : ListCell *lc;
1829 :
1830 1292 : foreach(lc, schemalist)
1831 : {
1832 284 : Oid schemaid = lfirst_oid(lc);
1833 :
1834 : /* Allow query cancel in case this takes a long time */
1835 284 : CHECK_FOR_INTERRUPTS();
1836 284 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1837 :
1838 : /*
1839 : * It is possible that by the time we acquire the lock on schema,
1840 : * concurrent DDL has removed it. We can test this by checking the
1841 : * existence of schema.
1842 : */
1843 284 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1844 0 : ereport(ERROR,
1845 : errcode(ERRCODE_UNDEFINED_SCHEMA),
1846 : errmsg("schema with OID %u does not exist", schemaid));
1847 : }
1848 1008 : }
1849 :
1850 : /*
1851 : * Add listed tables to the publication.
1852 : */
1853 : static void
1854 1042 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1855 : AlterPublicationStmt *stmt)
1856 : {
1857 : ListCell *lc;
1858 :
1859 : Assert(!stmt || !stmt->for_all_tables);
1860 :
1861 2076 : foreach(lc, rels)
1862 : {
1863 1102 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1864 1102 : Relation rel = pub_rel->relation;
1865 : ObjectAddress obj;
1866 :
1867 : /* Must be owner of the table or superuser. */
1868 1102 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1869 6 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1870 6 : RelationGetRelationName(rel));
1871 :
1872 1096 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1873 1034 : if (stmt)
1874 : {
1875 612 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1876 : (Node *) stmt);
1877 :
1878 612 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1879 : obj.objectId, 0);
1880 : }
1881 : }
1882 974 : }
1883 :
1884 : /*
1885 : * Remove listed tables from the publication.
1886 : */
1887 : static void
1888 502 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1889 : {
1890 : ObjectAddress obj;
1891 : ListCell *lc;
1892 : Oid prid;
1893 :
1894 962 : foreach(lc, rels)
1895 : {
1896 478 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1897 478 : Relation rel = pubrel->relation;
1898 478 : Oid relid = RelationGetRelid(rel);
1899 :
1900 478 : if (pubrel->columns)
1901 0 : ereport(ERROR,
1902 : errcode(ERRCODE_SYNTAX_ERROR),
1903 : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1904 :
1905 478 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1906 : ObjectIdGetDatum(relid),
1907 : ObjectIdGetDatum(pubid));
1908 478 : if (!OidIsValid(prid))
1909 : {
1910 12 : if (missing_ok)
1911 0 : continue;
1912 :
1913 12 : ereport(ERROR,
1914 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1915 : errmsg("relation \"%s\" is not part of the publication",
1916 : RelationGetRelationName(rel))));
1917 : }
1918 :
1919 466 : if (pubrel->whereClause)
1920 6 : ereport(ERROR,
1921 : (errcode(ERRCODE_SYNTAX_ERROR),
1922 : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1923 :
1924 460 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1925 460 : performDeletion(&obj, DROP_CASCADE, 0);
1926 : }
1927 484 : }
1928 :
1929 : /*
1930 : * Add listed schemas to the publication.
1931 : */
1932 : static void
1933 560 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1934 : AlterPublicationStmt *stmt)
1935 : {
1936 : ListCell *lc;
1937 :
1938 : Assert(!stmt || !stmt->for_all_tables);
1939 :
1940 770 : foreach(lc, schemas)
1941 : {
1942 222 : Oid schemaid = lfirst_oid(lc);
1943 : ObjectAddress obj;
1944 :
1945 222 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
1946 210 : if (stmt)
1947 : {
1948 58 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1949 : (Node *) stmt);
1950 :
1951 58 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1952 : obj.objectId, 0);
1953 : }
1954 : }
1955 548 : }
1956 :
1957 : /*
1958 : * Remove listed schemas from the publication.
1959 : */
1960 : static void
1961 442 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1962 : {
1963 : ObjectAddress obj;
1964 : ListCell *lc;
1965 : Oid psid;
1966 :
1967 492 : foreach(lc, schemas)
1968 : {
1969 56 : Oid schemaid = lfirst_oid(lc);
1970 :
1971 56 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1972 : Anum_pg_publication_namespace_oid,
1973 : ObjectIdGetDatum(schemaid),
1974 : ObjectIdGetDatum(pubid));
1975 56 : if (!OidIsValid(psid))
1976 : {
1977 6 : if (missing_ok)
1978 0 : continue;
1979 :
1980 6 : ereport(ERROR,
1981 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1982 : errmsg("tables from schema \"%s\" are not part of the publication",
1983 : get_namespace_name(schemaid))));
1984 : }
1985 :
1986 50 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1987 50 : performDeletion(&obj, DROP_CASCADE, 0);
1988 : }
1989 436 : }
1990 :
1991 : /*
1992 : * Internal workhorse for changing a publication owner
1993 : */
1994 : static void
1995 26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1996 : {
1997 : Form_pg_publication form;
1998 :
1999 26 : form = (Form_pg_publication) GETSTRUCT(tup);
2000 :
2001 26 : if (form->pubowner == newOwnerId)
2002 2 : return;
2003 :
2004 24 : if (!superuser())
2005 : {
2006 : AclResult aclresult;
2007 :
2008 : /* Must be owner */
2009 12 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2010 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2011 0 : NameStr(form->pubname));
2012 :
2013 : /* Must be able to become new owner */
2014 12 : check_can_set_role(GetUserId(), newOwnerId);
2015 :
2016 : /* New owner must have CREATE privilege on database */
2017 12 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2018 12 : if (aclresult != ACLCHECK_OK)
2019 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2020 0 : get_database_name(MyDatabaseId));
2021 :
2022 12 : if (form->puballtables && !superuser_arg(newOwnerId))
2023 0 : ereport(ERROR,
2024 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2025 : errmsg("permission denied to change owner of publication \"%s\"",
2026 : NameStr(form->pubname)),
2027 : errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
2028 :
2029 12 : if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
2030 6 : ereport(ERROR,
2031 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2032 : errmsg("permission denied to change owner of publication \"%s\"",
2033 : NameStr(form->pubname)),
2034 : errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
2035 : }
2036 :
2037 18 : form->pubowner = newOwnerId;
2038 18 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2039 :
2040 : /* Update owner dependency reference */
2041 18 : changeDependencyOnOwner(PublicationRelationId,
2042 : form->oid,
2043 : newOwnerId);
2044 :
2045 18 : InvokeObjectPostAlterHook(PublicationRelationId,
2046 : form->oid, 0);
2047 : }
2048 :
2049 : /*
2050 : * Change publication owner -- by name
2051 : */
2052 : ObjectAddress
2053 26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
2054 : {
2055 : Oid pubid;
2056 : HeapTuple tup;
2057 : Relation rel;
2058 : ObjectAddress address;
2059 : Form_pg_publication pubform;
2060 :
2061 26 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2062 :
2063 26 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2064 :
2065 26 : if (!HeapTupleIsValid(tup))
2066 0 : ereport(ERROR,
2067 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2068 : errmsg("publication \"%s\" does not exist", name)));
2069 :
2070 26 : pubform = (Form_pg_publication) GETSTRUCT(tup);
2071 26 : pubid = pubform->oid;
2072 :
2073 26 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2074 :
2075 20 : ObjectAddressSet(address, PublicationRelationId, pubid);
2076 :
2077 20 : heap_freetuple(tup);
2078 :
2079 20 : table_close(rel, RowExclusiveLock);
2080 :
2081 20 : return address;
2082 : }
2083 :
2084 : /*
2085 : * Change publication owner -- by OID
2086 : */
2087 : void
2088 0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
2089 : {
2090 : HeapTuple tup;
2091 : Relation rel;
2092 :
2093 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2094 :
2095 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2096 :
2097 0 : if (!HeapTupleIsValid(tup))
2098 0 : ereport(ERROR,
2099 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2100 : errmsg("publication with OID %u does not exist", pubid)));
2101 :
2102 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2103 :
2104 0 : heap_freetuple(tup);
2105 :
2106 0 : table_close(rel, RowExclusiveLock);
2107 0 : }
2108 :
2109 : /*
2110 : * Extract the publish_generated_columns option value from a DefElem. "stored"
2111 : * and "none" values are accepted.
2112 : */
2113 : static char
2114 70 : defGetGeneratedColsOption(DefElem *def)
2115 : {
2116 : char *sval;
2117 :
2118 : /*
2119 : * If no parameter value given, assume "stored" is meant.
2120 : */
2121 70 : if (!def->arg)
2122 6 : return PUBLISH_GENCOLS_STORED;
2123 :
2124 64 : sval = defGetString(def);
2125 :
2126 64 : if (pg_strcasecmp(sval, "none") == 0)
2127 22 : return PUBLISH_GENCOLS_NONE;
2128 42 : if (pg_strcasecmp(sval, "stored") == 0)
2129 36 : return PUBLISH_GENCOLS_STORED;
2130 :
2131 6 : ereport(ERROR,
2132 : errcode(ERRCODE_SYNTAX_ERROR),
2133 : errmsg("%s requires a \"none\" or \"stored\" value",
2134 : def->defname));
2135 :
2136 : return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2137 : }
|