Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/namespace.h"
23 : #include "catalog/objectaccess.h"
24 : #include "catalog/objectaddress.h"
25 : #include "catalog/pg_database.h"
26 : #include "catalog/pg_inherits.h"
27 : #include "catalog/pg_namespace.h"
28 : #include "catalog/pg_proc.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "commands/defrem.h"
33 : #include "commands/event_trigger.h"
34 : #include "commands/publicationcmds.h"
35 : #include "miscadmin.h"
36 : #include "nodes/nodeFuncs.h"
37 : #include "parser/parse_clause.h"
38 : #include "parser/parse_collate.h"
39 : #include "parser/parse_relation.h"
40 : #include "rewrite/rewriteHandler.h"
41 : #include "storage/lmgr.h"
42 : #include "utils/acl.h"
43 : #include "utils/builtins.h"
44 : #include "utils/inval.h"
45 : #include "utils/lsyscache.h"
46 : #include "utils/rel.h"
47 : #include "utils/syscache.h"
48 : #include "utils/varlena.h"
49 :
50 :
51 : /*
52 : * Information used to validate the columns in the row filter expression. See
53 : * contain_invalid_rfcolumn_walker for details.
54 : */
55 : typedef struct rf_context
56 : {
57 : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : bool pubviaroot; /* true if we are validating the parent
59 : * relation's row filter */
60 : Oid relid; /* relid of the relation */
61 : Oid parentid; /* relid of the parent relation */
62 : } rf_context;
63 :
64 : static List *OpenTableList(List *tables);
65 : static void CloseTableList(List *rels);
66 : static void LockSchemaList(List *schemalist);
67 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : AlterPublicationStmt *stmt);
69 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : AlterPublicationStmt *stmt);
72 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 : static char defGetGeneratedColsOption(DefElem *def);
74 :
75 :
76 : static void
77 666 : parse_publication_options(ParseState *pstate,
78 : List *options,
79 : bool *publish_given,
80 : PublicationActions *pubactions,
81 : bool *publish_via_partition_root_given,
82 : bool *publish_via_partition_root,
83 : bool *publish_generated_columns_given,
84 : char *publish_generated_columns)
85 : {
86 : ListCell *lc;
87 :
88 666 : *publish_given = false;
89 666 : *publish_via_partition_root_given = false;
90 666 : *publish_generated_columns_given = false;
91 :
92 : /* defaults */
93 666 : pubactions->pubinsert = true;
94 666 : pubactions->pubupdate = true;
95 666 : pubactions->pubdelete = true;
96 666 : pubactions->pubtruncate = true;
97 666 : *publish_via_partition_root = false;
98 666 : *publish_generated_columns = PUBLISH_GENCOLS_NONE;
99 :
100 : /* Parse options */
101 898 : foreach(lc, options)
102 : {
103 256 : DefElem *defel = (DefElem *) lfirst(lc);
104 :
105 256 : if (strcmp(defel->defname, "publish") == 0)
106 : {
107 : char *publish;
108 : List *publish_list;
109 : ListCell *lc2;
110 :
111 89 : if (*publish_given)
112 0 : errorConflictingDefElem(defel, pstate);
113 :
114 : /*
115 : * If publish option was given only the explicitly listed actions
116 : * should be published.
117 : */
118 89 : pubactions->pubinsert = false;
119 89 : pubactions->pubupdate = false;
120 89 : pubactions->pubdelete = false;
121 89 : pubactions->pubtruncate = false;
122 :
123 89 : *publish_given = true;
124 :
125 : /*
126 : * SplitIdentifierString destructively modifies its input, so make
127 : * a copy so we don't modify the memory of the executing statement
128 : */
129 89 : publish = pstrdup(defGetString(defel));
130 :
131 89 : if (!SplitIdentifierString(publish, ',', &publish_list))
132 0 : ereport(ERROR,
133 : (errcode(ERRCODE_SYNTAX_ERROR),
134 : errmsg("invalid list syntax in parameter \"%s\"",
135 : "publish")));
136 :
137 : /* Process the option list. */
138 208 : foreach(lc2, publish_list)
139 : {
140 123 : char *publish_opt = (char *) lfirst(lc2);
141 :
142 123 : if (strcmp(publish_opt, "insert") == 0)
143 79 : pubactions->pubinsert = true;
144 44 : else if (strcmp(publish_opt, "update") == 0)
145 18 : pubactions->pubupdate = true;
146 26 : else if (strcmp(publish_opt, "delete") == 0)
147 11 : pubactions->pubdelete = true;
148 15 : else if (strcmp(publish_opt, "truncate") == 0)
149 11 : pubactions->pubtruncate = true;
150 : else
151 4 : ereport(ERROR,
152 : (errcode(ERRCODE_SYNTAX_ERROR),
153 : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
154 : "publish", publish_opt)));
155 : }
156 : }
157 167 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
158 : {
159 110 : if (*publish_via_partition_root_given)
160 4 : errorConflictingDefElem(defel, pstate);
161 106 : *publish_via_partition_root_given = true;
162 106 : *publish_via_partition_root = defGetBoolean(defel);
163 : }
164 57 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
165 : {
166 53 : if (*publish_generated_columns_given)
167 4 : errorConflictingDefElem(defel, pstate);
168 49 : *publish_generated_columns_given = true;
169 49 : *publish_generated_columns = defGetGeneratedColsOption(defel);
170 : }
171 : else
172 4 : ereport(ERROR,
173 : (errcode(ERRCODE_SYNTAX_ERROR),
174 : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
175 : }
176 642 : }
177 :
178 : /*
179 : * Convert the PublicationObjSpecType list into schema oid list and
180 : * PublicationTable list.
181 : */
182 : static void
183 1251 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
184 : List **rels, List **exceptrels, List **schemas)
185 : {
186 : ListCell *cell;
187 : PublicationObjSpec *pubobj;
188 :
189 1251 : if (!pubobjspec_list)
190 183 : return;
191 :
192 2264 : foreach(cell, pubobjspec_list)
193 : {
194 : Oid schemaid;
195 : List *search_path;
196 :
197 1220 : pubobj = (PublicationObjSpec *) lfirst(cell);
198 :
199 1220 : switch (pubobj->pubobjtype)
200 : {
201 59 : case PUBLICATIONOBJ_EXCEPT_TABLE:
202 59 : pubobj->pubtable->except = true;
203 59 : *exceptrels = lappend(*exceptrels, pubobj->pubtable);
204 59 : break;
205 901 : case PUBLICATIONOBJ_TABLE:
206 901 : pubobj->pubtable->except = false;
207 901 : *rels = lappend(*rels, pubobj->pubtable);
208 901 : break;
209 244 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
210 244 : schemaid = get_namespace_oid(pubobj->name, false);
211 :
212 : /* Filter out duplicates if user specifies "sch1, sch1" */
213 224 : *schemas = list_append_unique_oid(*schemas, schemaid);
214 224 : break;
215 16 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
216 16 : search_path = fetch_search_path(false);
217 16 : if (search_path == NIL) /* nothing valid in search_path? */
218 4 : ereport(ERROR,
219 : errcode(ERRCODE_UNDEFINED_SCHEMA),
220 : errmsg("no schema has been selected for CURRENT_SCHEMA"));
221 :
222 12 : schemaid = linitial_oid(search_path);
223 12 : list_free(search_path);
224 :
225 : /* Filter out duplicates if user specifies "sch1, sch1" */
226 12 : *schemas = list_append_unique_oid(*schemas, schemaid);
227 12 : break;
228 0 : default:
229 : /* shouldn't happen */
230 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
231 : break;
232 : }
233 : }
234 : }
235 :
236 : /*
237 : * Returns true if any of the columns used in the row filter WHERE expression is
238 : * not part of REPLICA IDENTITY, false otherwise.
239 : */
240 : static bool
241 167 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
242 : {
243 167 : if (node == NULL)
244 0 : return false;
245 :
246 167 : if (IsA(node, Var))
247 : {
248 68 : Var *var = (Var *) node;
249 68 : AttrNumber attnum = var->varattno;
250 :
251 : /*
252 : * If pubviaroot is true, we are validating the row filter of the
253 : * parent table, but the bitmap contains the replica identity
254 : * information of the child table. So, get the column number of the
255 : * child table as parent and child column order could be different.
256 : */
257 68 : if (context->pubviaroot)
258 : {
259 10 : char *colname = get_attname(context->parentid, attnum, false);
260 :
261 10 : attnum = get_attnum(context->relid, colname);
262 : }
263 :
264 68 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
265 68 : context->bms_replident))
266 40 : return true;
267 : }
268 :
269 127 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
270 : context);
271 : }
272 :
273 : /*
274 : * Check if all columns referenced in the filter expression are part of the
275 : * REPLICA IDENTITY index or not.
276 : *
277 : * Returns true if any invalid column is found.
278 : */
279 : bool
280 434 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
281 : bool pubviaroot)
282 : {
283 : HeapTuple rftuple;
284 434 : Oid relid = RelationGetRelid(relation);
285 434 : Oid publish_as_relid = RelationGetRelid(relation);
286 434 : bool result = false;
287 : Datum rfdatum;
288 : bool rfisnull;
289 :
290 : /*
291 : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
292 : * allowed in the row filter and we can skip the validation.
293 : */
294 434 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
295 113 : return false;
296 :
297 : /*
298 : * For a partition, if pubviaroot is true, find the topmost ancestor that
299 : * is published via this publication as we need to use its row filter
300 : * expression to filter the partition's changes.
301 : *
302 : * Note that even though the row filter used is for an ancestor, the
303 : * REPLICA IDENTITY used will be for the actual child table.
304 : */
305 321 : if (pubviaroot && relation->rd_rel->relispartition)
306 : {
307 : publish_as_relid
308 68 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
309 :
310 68 : if (!OidIsValid(publish_as_relid))
311 3 : publish_as_relid = relid;
312 : }
313 :
314 321 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
315 : ObjectIdGetDatum(publish_as_relid),
316 : ObjectIdGetDatum(pubid));
317 :
318 321 : if (!HeapTupleIsValid(rftuple))
319 34 : return false;
320 :
321 287 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
322 : Anum_pg_publication_rel_prqual,
323 : &rfisnull);
324 :
325 287 : if (!rfisnull)
326 : {
327 67 : rf_context context = {0};
328 : Node *rfnode;
329 67 : Bitmapset *bms = NULL;
330 :
331 67 : context.pubviaroot = pubviaroot;
332 67 : context.parentid = publish_as_relid;
333 67 : context.relid = relid;
334 :
335 : /* Remember columns that are part of the REPLICA IDENTITY */
336 67 : bms = RelationGetIndexAttrBitmap(relation,
337 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
338 :
339 67 : context.bms_replident = bms;
340 67 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
341 67 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
342 : }
343 :
344 287 : ReleaseSysCache(rftuple);
345 :
346 287 : return result;
347 : }
348 :
349 : /*
350 : * Check for invalid columns in the publication table definition.
351 : *
352 : * This function evaluates two conditions:
353 : *
354 : * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
355 : * by the column list. If any column is missing, *invalid_column_list is set
356 : * to true.
357 : * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
358 : * are published, either by being explicitly named in the column list or, if
359 : * no column list is specified, by setting the option
360 : * publish_generated_columns to stored. If any unpublished
361 : * generated column is found, *invalid_gen_col is set to true.
362 : *
363 : * Returns true if any of the above conditions are not met.
364 : */
365 : bool
366 547 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
367 : bool pubviaroot, char pubgencols_type,
368 : bool *invalid_column_list,
369 : bool *invalid_gen_col)
370 : {
371 547 : Oid relid = RelationGetRelid(relation);
372 547 : Oid publish_as_relid = RelationGetRelid(relation);
373 : Bitmapset *idattrs;
374 547 : Bitmapset *columns = NULL;
375 547 : TupleDesc desc = RelationGetDescr(relation);
376 : Publication *pub;
377 : int x;
378 :
379 547 : *invalid_column_list = false;
380 547 : *invalid_gen_col = false;
381 :
382 : /*
383 : * For a partition, if pubviaroot is true, find the topmost ancestor that
384 : * is published via this publication as we need to use its column list for
385 : * the changes.
386 : *
387 : * Note that even though the column list used is for an ancestor, the
388 : * REPLICA IDENTITY used will be for the actual child table.
389 : */
390 547 : if (pubviaroot && relation->rd_rel->relispartition)
391 : {
392 97 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
393 :
394 97 : if (!OidIsValid(publish_as_relid))
395 18 : publish_as_relid = relid;
396 : }
397 :
398 : /* Fetch the column list */
399 547 : pub = GetPublication(pubid);
400 547 : check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
401 :
402 547 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
403 : {
404 : /* With REPLICA IDENTITY FULL, no column list is allowed. */
405 120 : *invalid_column_list = (columns != NULL);
406 :
407 : /*
408 : * As we don't allow a column list with REPLICA IDENTITY FULL, the
409 : * publish_generated_columns option must be set to stored if the table
410 : * has any stored generated columns.
411 : */
412 120 : if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
413 112 : relation->rd_att->constr &&
414 52 : relation->rd_att->constr->has_generated_stored)
415 4 : *invalid_gen_col = true;
416 :
417 : /*
418 : * Virtual generated columns are currently not supported for logical
419 : * replication at all.
420 : */
421 120 : if (relation->rd_att->constr &&
422 60 : relation->rd_att->constr->has_generated_virtual)
423 8 : *invalid_gen_col = true;
424 :
425 120 : if (*invalid_gen_col && *invalid_column_list)
426 0 : return true;
427 : }
428 :
429 : /* Remember columns that are part of the REPLICA IDENTITY */
430 547 : idattrs = RelationGetIndexAttrBitmap(relation,
431 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
432 :
433 : /*
434 : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
435 : * (to handle system columns the usual way), while column list does not
436 : * use offset, so we can't do bms_is_subset(). Instead, we have to loop
437 : * over the idattrs and check all of them are in the list.
438 : */
439 547 : x = -1;
440 925 : while ((x = bms_next_member(idattrs, x)) >= 0)
441 : {
442 382 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
443 382 : Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
444 :
445 382 : if (columns == NULL)
446 : {
447 : /*
448 : * The publish_generated_columns option must be set to stored if
449 : * the REPLICA IDENTITY contains any stored generated column.
450 : */
451 247 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
452 : {
453 4 : *invalid_gen_col = true;
454 4 : break;
455 : }
456 :
457 : /*
458 : * The equivalent setting for virtual generated columns does not
459 : * exist yet.
460 : */
461 243 : if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
462 : {
463 0 : *invalid_gen_col = true;
464 0 : break;
465 : }
466 :
467 : /* Skip validating the column list since it is not defined */
468 243 : continue;
469 : }
470 :
471 : /*
472 : * If pubviaroot is true, we are validating the column list of the
473 : * parent table, but the bitmap contains the replica identity
474 : * information of the child table. The parent/child attnums may not
475 : * match, so translate them to the parent - get the attname from the
476 : * child, and look it up in the parent.
477 : */
478 135 : if (pubviaroot)
479 : {
480 : /* attribute name in the child table */
481 51 : char *colname = get_attname(relid, attnum, false);
482 :
483 : /*
484 : * Determine the attnum for the attribute name in parent (we are
485 : * using the column list defined on the parent).
486 : */
487 51 : attnum = get_attnum(publish_as_relid, colname);
488 : }
489 :
490 : /* replica identity column, not covered by the column list */
491 135 : *invalid_column_list |= !bms_is_member(attnum, columns);
492 :
493 135 : if (*invalid_column_list && *invalid_gen_col)
494 0 : break;
495 : }
496 :
497 547 : bms_free(columns);
498 547 : bms_free(idattrs);
499 :
500 547 : return *invalid_column_list || *invalid_gen_col;
501 : }
502 :
503 : /*
504 : * Invalidate entries in the RelationSyncCache for relations included in the
505 : * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
506 : *
507 : * If 'puballtables' is true, invalidate all cache entries.
508 : */
509 : void
510 20 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
511 : {
512 20 : if (puballtables)
513 : {
514 3 : CacheInvalidateRelSyncAll();
515 : }
516 : else
517 : {
518 17 : List *relids = NIL;
519 17 : List *schemarelids = NIL;
520 :
521 : /*
522 : * For partitioned tables, we must invalidate all partitions and
523 : * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
524 : * a target. However, WAL records for TRUNCATE specify both a root and
525 : * its leaves.
526 : */
527 17 : relids = GetIncludedPublicationRelations(pubid,
528 : PUBLICATION_PART_ALL);
529 17 : schemarelids = GetAllSchemaPublicationRelations(pubid,
530 : PUBLICATION_PART_ALL);
531 :
532 17 : relids = list_concat_unique_oid(relids, schemarelids);
533 :
534 : /* Invalidate the relsyncache */
535 37 : foreach_oid(relid, relids)
536 3 : CacheInvalidateRelSync(relid);
537 : }
538 :
539 20 : return;
540 : }
541 :
542 : /* check_functions_in_node callback */
543 : static bool
544 278 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
545 : {
546 278 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
547 : func_id >= FirstNormalObjectId);
548 : }
549 :
550 : /*
551 : * The row filter walker checks if the row filter expression is a "simple
552 : * expression".
553 : *
554 : * It allows only simple or compound expressions such as:
555 : * - (Var Op Const)
556 : * - (Var Op Var)
557 : * - (Var Op Const) AND/OR (Var Op Const)
558 : * - etc
559 : * (where Var is a column of the table this filter belongs to)
560 : *
561 : * The simple expression has the following restrictions:
562 : * - User-defined operators are not allowed;
563 : * - User-defined functions are not allowed;
564 : * - User-defined types are not allowed;
565 : * - User-defined collations are not allowed;
566 : * - Non-immutable built-in functions are not allowed;
567 : * - System columns are not allowed.
568 : *
569 : * NOTES
570 : *
571 : * We don't allow user-defined functions/operators/types/collations because
572 : * (a) if a user drops a user-defined object used in a row filter expression or
573 : * if there is any other error while using it, the logical decoding
574 : * infrastructure won't be able to recover from such an error even if the
575 : * object is recreated again because a historic snapshot is used to evaluate
576 : * the row filter;
577 : * (b) a user-defined function can be used to access tables that could have
578 : * unpleasant results because a historic snapshot is used. That's why only
579 : * immutable built-in functions are allowed in row filter expressions.
580 : *
581 : * We don't allow system columns because currently, we don't have that
582 : * information in the tuple passed to downstream. Also, as we don't replicate
583 : * those to subscribers, there doesn't seem to be a need for a filter on those
584 : * columns.
585 : *
586 : * We can allow other node types after more analysis and testing.
587 : */
588 : static bool
589 923 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
590 : {
591 923 : char *errdetail_msg = NULL;
592 :
593 923 : if (node == NULL)
594 4 : return false;
595 :
596 919 : switch (nodeTag(node))
597 : {
598 248 : case T_Var:
599 : /* System columns are not allowed. */
600 248 : if (((Var *) node)->varattno < InvalidAttrNumber)
601 4 : errdetail_msg = _("System columns are not allowed.");
602 248 : break;
603 249 : case T_OpExpr:
604 : case T_DistinctExpr:
605 : case T_NullIfExpr:
606 : /* OK, except user-defined operators are not allowed. */
607 249 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
608 4 : errdetail_msg = _("User-defined operators are not allowed.");
609 249 : break;
610 4 : case T_ScalarArrayOpExpr:
611 : /* OK, except user-defined operators are not allowed. */
612 4 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
613 0 : errdetail_msg = _("User-defined operators are not allowed.");
614 :
615 : /*
616 : * We don't need to check the hashfuncid and negfuncid of
617 : * ScalarArrayOpExpr as those functions are only built for a
618 : * subquery.
619 : */
620 4 : break;
621 4 : case T_RowCompareExpr:
622 : {
623 : ListCell *opid;
624 :
625 : /* OK, except user-defined operators are not allowed. */
626 12 : foreach(opid, ((RowCompareExpr *) node)->opnos)
627 : {
628 8 : if (lfirst_oid(opid) >= FirstNormalObjectId)
629 : {
630 0 : errdetail_msg = _("User-defined operators are not allowed.");
631 0 : break;
632 : }
633 : }
634 : }
635 4 : break;
636 410 : case T_Const:
637 : case T_FuncExpr:
638 : case T_BoolExpr:
639 : case T_RelabelType:
640 : case T_CollateExpr:
641 : case T_CaseExpr:
642 : case T_CaseTestExpr:
643 : case T_ArrayExpr:
644 : case T_RowExpr:
645 : case T_CoalesceExpr:
646 : case T_MinMaxExpr:
647 : case T_XmlExpr:
648 : case T_NullTest:
649 : case T_BooleanTest:
650 : case T_List:
651 : /* OK, supported */
652 410 : break;
653 4 : default:
654 4 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
655 4 : break;
656 : }
657 :
658 : /*
659 : * For all the supported nodes, if we haven't already found a problem,
660 : * check the types, functions, and collations used in it. We check List
661 : * by walking through each element.
662 : */
663 919 : if (!errdetail_msg && !IsA(node, List))
664 : {
665 871 : if (exprType(node) >= FirstNormalObjectId)
666 4 : errdetail_msg = _("User-defined types are not allowed.");
667 867 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
668 : pstate))
669 8 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
670 1718 : else if (exprCollation(node) >= FirstNormalObjectId ||
671 859 : exprInputCollation(node) >= FirstNormalObjectId)
672 4 : errdetail_msg = _("User-defined collations are not allowed.");
673 : }
674 :
675 : /*
676 : * If we found a problem in this node, throw error now. Otherwise keep
677 : * going.
678 : */
679 919 : if (errdetail_msg)
680 28 : ereport(ERROR,
681 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
682 : errmsg("invalid publication WHERE expression"),
683 : errdetail_internal("%s", errdetail_msg),
684 : parser_errposition(pstate, exprLocation(node))));
685 :
686 891 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
687 : pstate);
688 : }
689 :
690 : /*
691 : * Check if the row filter expression is a "simple expression".
692 : *
693 : * See check_simple_rowfilter_expr_walker for details.
694 : */
695 : static bool
696 241 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
697 : {
698 241 : return check_simple_rowfilter_expr_walker(node, pstate);
699 : }
700 :
701 : /*
702 : * Transform the publication WHERE expression for all the relations in the list,
703 : * ensuring it is coerced to boolean and necessary collation information is
704 : * added if required, and add a new nsitem/RTE for the associated relation to
705 : * the ParseState's namespace list.
706 : *
707 : * Also check the publication row filter expression and throw an error if
708 : * anything not permitted or unexpected is encountered.
709 : */
710 : static void
711 747 : TransformPubWhereClauses(List *tables, const char *queryString,
712 : bool pubviaroot)
713 : {
714 : ListCell *lc;
715 :
716 1495 : foreach(lc, tables)
717 : {
718 : ParseNamespaceItem *nsitem;
719 788 : Node *whereclause = NULL;
720 : ParseState *pstate;
721 788 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
722 :
723 788 : if (pri->whereClause == NULL)
724 535 : continue;
725 :
726 : /*
727 : * If the publication doesn't publish changes via the root partitioned
728 : * table, the partition's row filter will be used. So disallow using
729 : * WHERE clause on partitioned table in this case.
730 : */
731 253 : if (!pubviaroot &&
732 235 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
733 4 : ereport(ERROR,
734 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
735 : errmsg("cannot use publication WHERE clause for relation \"%s\"",
736 : RelationGetRelationName(pri->relation)),
737 : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
738 : "publish_via_partition_root")));
739 :
740 : /*
741 : * A fresh pstate is required so that we only have "this" table in its
742 : * rangetable
743 : */
744 249 : pstate = make_parsestate(NULL);
745 249 : pstate->p_sourcetext = queryString;
746 249 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
747 : AccessShareLock, NULL,
748 : false, false);
749 249 : addNSItemToQuery(pstate, nsitem, false, true, true);
750 :
751 249 : whereclause = transformWhereClause(pstate,
752 249 : copyObject(pri->whereClause),
753 : EXPR_KIND_WHERE,
754 : "PUBLICATION WHERE");
755 :
756 : /* Fix up collation information */
757 241 : assign_expr_collations(pstate, whereclause);
758 :
759 241 : whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
760 :
761 : /*
762 : * We allow only simple expressions in row filters. See
763 : * check_simple_rowfilter_expr_walker.
764 : */
765 241 : check_simple_rowfilter_expr(whereclause, pstate);
766 :
767 213 : free_parsestate(pstate);
768 :
769 213 : pri->whereClause = whereclause;
770 : }
771 707 : }
772 :
773 :
774 : /*
775 : * Given a list of tables that are going to be added to a publication,
776 : * verify that they fulfill the necessary preconditions, namely: no tables
777 : * have a column list if any schema is published; and partitioned tables do
778 : * not have column lists if publish_via_partition_root is not set.
779 : *
780 : * 'publish_schema' indicates that the publication contains any TABLES IN
781 : * SCHEMA elements (newly added in this command, or preexisting).
782 : * 'pubviaroot' is the value of publish_via_partition_root.
783 : */
784 : static void
785 707 : CheckPubRelationColumnList(char *pubname, List *tables,
786 : bool publish_schema, bool pubviaroot)
787 : {
788 : ListCell *lc;
789 :
790 1435 : foreach(lc, tables)
791 : {
792 748 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
793 :
794 748 : if (pri->columns == NIL)
795 492 : continue;
796 :
797 : /*
798 : * Disallow specifying column list if any schema is in the
799 : * publication.
800 : *
801 : * XXX We could instead just forbid the case when the publication
802 : * tries to publish the table with a column list and a schema for that
803 : * table. However, if we do that then we need a restriction during
804 : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
805 : * seem to be a good idea.
806 : */
807 256 : if (publish_schema)
808 16 : ereport(ERROR,
809 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
810 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
811 : get_namespace_name(RelationGetNamespace(pri->relation)),
812 : RelationGetRelationName(pri->relation), pubname),
813 : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
814 :
815 : /*
816 : * If the publication doesn't publish changes via the root partitioned
817 : * table, the partition's column list will be used. So disallow using
818 : * a column list on the partitioned table in this case.
819 : */
820 240 : if (!pubviaroot &&
821 194 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
822 4 : ereport(ERROR,
823 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
824 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
825 : get_namespace_name(RelationGetNamespace(pri->relation)),
826 : RelationGetRelationName(pri->relation), pubname),
827 : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
828 : "publish_via_partition_root")));
829 : }
830 687 : }
831 :
832 : /*
833 : * Create new publication.
834 : */
835 : ObjectAddress
836 590 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
837 : {
838 : Relation rel;
839 : ObjectAddress myself;
840 : Oid puboid;
841 : bool nulls[Natts_pg_publication];
842 : Datum values[Natts_pg_publication];
843 : HeapTuple tup;
844 : bool publish_given;
845 : PublicationActions pubactions;
846 : bool publish_via_partition_root_given;
847 : bool publish_via_partition_root;
848 : bool publish_generated_columns_given;
849 : char publish_generated_columns;
850 : AclResult aclresult;
851 590 : List *relations = NIL;
852 590 : List *exceptrelations = NIL;
853 590 : List *schemaidlist = NIL;
854 :
855 : /* must have CREATE privilege on database */
856 590 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
857 590 : if (aclresult != ACLCHECK_OK)
858 4 : aclcheck_error(aclresult, OBJECT_DATABASE,
859 4 : get_database_name(MyDatabaseId));
860 :
861 : /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
862 586 : if (!superuser())
863 : {
864 15 : if (stmt->for_all_tables || stmt->for_all_sequences)
865 0 : ereport(ERROR,
866 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
867 : errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
868 : }
869 :
870 586 : rel = table_open(PublicationRelationId, RowExclusiveLock);
871 :
872 : /* Check if name is used */
873 586 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
874 : CStringGetDatum(stmt->pubname));
875 586 : if (OidIsValid(puboid))
876 4 : ereport(ERROR,
877 : (errcode(ERRCODE_DUPLICATE_OBJECT),
878 : errmsg("publication \"%s\" already exists",
879 : stmt->pubname)));
880 :
881 : /* Form a tuple. */
882 582 : memset(values, 0, sizeof(values));
883 582 : memset(nulls, false, sizeof(nulls));
884 :
885 582 : values[Anum_pg_publication_pubname - 1] =
886 582 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
887 582 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
888 :
889 582 : parse_publication_options(pstate,
890 : stmt->options,
891 : &publish_given, &pubactions,
892 : &publish_via_partition_root_given,
893 : &publish_via_partition_root,
894 : &publish_generated_columns_given,
895 : &publish_generated_columns);
896 :
897 558 : if (stmt->for_all_sequences &&
898 17 : (publish_given || publish_via_partition_root_given ||
899 : publish_generated_columns_given))
900 1 : ereport(NOTICE,
901 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
902 : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
903 :
904 558 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
905 : Anum_pg_publication_oid);
906 558 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
907 558 : values[Anum_pg_publication_puballtables - 1] =
908 558 : BoolGetDatum(stmt->for_all_tables);
909 558 : values[Anum_pg_publication_puballsequences - 1] =
910 558 : BoolGetDatum(stmt->for_all_sequences);
911 558 : values[Anum_pg_publication_pubinsert - 1] =
912 558 : BoolGetDatum(pubactions.pubinsert);
913 558 : values[Anum_pg_publication_pubupdate - 1] =
914 558 : BoolGetDatum(pubactions.pubupdate);
915 558 : values[Anum_pg_publication_pubdelete - 1] =
916 558 : BoolGetDatum(pubactions.pubdelete);
917 558 : values[Anum_pg_publication_pubtruncate - 1] =
918 558 : BoolGetDatum(pubactions.pubtruncate);
919 558 : values[Anum_pg_publication_pubviaroot - 1] =
920 558 : BoolGetDatum(publish_via_partition_root);
921 558 : values[Anum_pg_publication_pubgencols - 1] =
922 558 : CharGetDatum(publish_generated_columns);
923 :
924 558 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
925 :
926 : /* Insert tuple into catalog. */
927 558 : CatalogTupleInsert(rel, tup);
928 558 : heap_freetuple(tup);
929 :
930 558 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
931 :
932 558 : ObjectAddressSet(myself, PublicationRelationId, puboid);
933 :
934 : /* Make the changes visible. */
935 558 : CommandCounterIncrement();
936 :
937 : /* Associate objects with the publication. */
938 558 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
939 : &exceptrelations, &schemaidlist);
940 :
941 546 : if (stmt->for_all_tables)
942 : {
943 : /* Process EXCEPT table list */
944 100 : if (exceptrelations != NIL)
945 : {
946 : List *rels;
947 :
948 35 : rels = OpenTableList(exceptrelations);
949 35 : PublicationAddTables(puboid, rels, true, NULL);
950 31 : CloseTableList(rels);
951 : }
952 :
953 : /*
954 : * Invalidate relcache so that publication info is rebuilt. Sequences
955 : * publication doesn't require invalidation, as replica identity
956 : * checks don't apply to them.
957 : */
958 96 : CacheInvalidateRelcacheAll();
959 : }
960 446 : else if (!stmt->for_all_sequences)
961 : {
962 : /* FOR TABLES IN SCHEMA requires superuser */
963 434 : if (schemaidlist != NIL && !superuser())
964 4 : ereport(ERROR,
965 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
966 : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
967 :
968 430 : if (relations != NIL)
969 : {
970 : List *rels;
971 :
972 285 : rels = OpenTableList(relations);
973 265 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
974 : publish_via_partition_root);
975 :
976 249 : CheckPubRelationColumnList(stmt->pubname, rels,
977 : schemaidlist != NIL,
978 : publish_via_partition_root);
979 :
980 245 : PublicationAddTables(puboid, rels, true, NULL);
981 228 : CloseTableList(rels);
982 : }
983 :
984 373 : if (schemaidlist != NIL)
985 : {
986 : /*
987 : * Schema lock is held until the publication is created to prevent
988 : * concurrent schema deletion.
989 : */
990 100 : LockSchemaList(schemaidlist);
991 100 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
992 : }
993 : }
994 :
995 477 : table_close(rel, RowExclusiveLock);
996 :
997 477 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
998 :
999 : /*
1000 : * We don't need this warning message when wal_level >= 'replica' since
1001 : * logical decoding is automatically enabled up on a logical slot
1002 : * creation.
1003 : */
1004 477 : if (wal_level < WAL_LEVEL_REPLICA)
1005 93 : ereport(WARNING,
1006 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1007 : errmsg("logical decoding must be enabled to publish logical changes"),
1008 : errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
1009 :
1010 477 : return myself;
1011 : }
1012 :
1013 : /*
1014 : * Change options of a publication.
1015 : */
1016 : static void
1017 84 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
1018 : Relation rel, HeapTuple tup)
1019 : {
1020 : bool nulls[Natts_pg_publication];
1021 : bool replaces[Natts_pg_publication];
1022 : Datum values[Natts_pg_publication];
1023 : bool publish_given;
1024 : PublicationActions pubactions;
1025 : bool publish_via_partition_root_given;
1026 : bool publish_via_partition_root;
1027 : bool publish_generated_columns_given;
1028 : char publish_generated_columns;
1029 : ObjectAddress obj;
1030 : Form_pg_publication pubform;
1031 84 : List *root_relids = NIL;
1032 : ListCell *lc;
1033 :
1034 84 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1035 :
1036 84 : parse_publication_options(pstate,
1037 : stmt->options,
1038 : &publish_given, &pubactions,
1039 : &publish_via_partition_root_given,
1040 : &publish_via_partition_root,
1041 : &publish_generated_columns_given,
1042 : &publish_generated_columns);
1043 :
1044 84 : if (pubform->puballsequences &&
1045 8 : (publish_given || publish_via_partition_root_given ||
1046 : publish_generated_columns_given))
1047 8 : ereport(NOTICE,
1048 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1049 : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
1050 :
1051 : /*
1052 : * If the publication doesn't publish changes via the root partitioned
1053 : * table, the partition's row filter and column list will be used. So
1054 : * disallow using WHERE clause and column lists on partitioned table in
1055 : * this case.
1056 : */
1057 84 : if (!pubform->puballtables && publish_via_partition_root_given &&
1058 53 : !publish_via_partition_root)
1059 : {
1060 : /*
1061 : * Lock the publication so nobody else can do anything with it. This
1062 : * prevents concurrent alter to add partitioned table(s) with WHERE
1063 : * clause(s) and/or column lists which we don't allow when not
1064 : * publishing via root.
1065 : */
1066 32 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1067 : AccessShareLock);
1068 :
1069 32 : root_relids = GetIncludedPublicationRelations(pubform->oid,
1070 : PUBLICATION_PART_ROOT);
1071 :
1072 56 : foreach(lc, root_relids)
1073 : {
1074 32 : Oid relid = lfirst_oid(lc);
1075 : HeapTuple rftuple;
1076 : char relkind;
1077 : char *relname;
1078 : bool has_rowfilter;
1079 : bool has_collist;
1080 :
1081 : /*
1082 : * Beware: we don't have lock on the relations, so cope silently
1083 : * with the cache lookups returning NULL.
1084 : */
1085 :
1086 32 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1087 : ObjectIdGetDatum(relid),
1088 : ObjectIdGetDatum(pubform->oid));
1089 32 : if (!HeapTupleIsValid(rftuple))
1090 0 : continue;
1091 32 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1092 32 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1093 32 : if (!has_rowfilter && !has_collist)
1094 : {
1095 8 : ReleaseSysCache(rftuple);
1096 8 : continue;
1097 : }
1098 :
1099 24 : relkind = get_rel_relkind(relid);
1100 24 : if (relkind != RELKIND_PARTITIONED_TABLE)
1101 : {
1102 16 : ReleaseSysCache(rftuple);
1103 16 : continue;
1104 : }
1105 8 : relname = get_rel_name(relid);
1106 8 : if (relname == NULL) /* table concurrently dropped */
1107 : {
1108 0 : ReleaseSysCache(rftuple);
1109 0 : continue;
1110 : }
1111 :
1112 8 : if (has_rowfilter)
1113 4 : ereport(ERROR,
1114 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1115 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1116 : "publish_via_partition_root",
1117 : stmt->pubname),
1118 : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1119 : relname, "publish_via_partition_root")));
1120 : Assert(has_collist);
1121 4 : ereport(ERROR,
1122 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1123 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1124 : "publish_via_partition_root",
1125 : stmt->pubname),
1126 : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1127 : relname, "publish_via_partition_root")));
1128 : }
1129 : }
1130 :
1131 : /* Everything ok, form a new tuple. */
1132 76 : memset(values, 0, sizeof(values));
1133 76 : memset(nulls, false, sizeof(nulls));
1134 76 : memset(replaces, false, sizeof(replaces));
1135 :
1136 76 : if (publish_given)
1137 : {
1138 22 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1139 22 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1140 :
1141 22 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1142 22 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1143 :
1144 22 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1145 22 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1146 :
1147 22 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1148 22 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1149 : }
1150 :
1151 76 : if (publish_via_partition_root_given)
1152 : {
1153 46 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1154 46 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1155 : }
1156 :
1157 76 : if (publish_generated_columns_given)
1158 : {
1159 8 : values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1160 8 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1161 : }
1162 :
1163 76 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1164 : replaces);
1165 :
1166 : /* Update the catalog. */
1167 76 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1168 :
1169 76 : CommandCounterIncrement();
1170 :
1171 76 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1172 :
1173 : /* Invalidate the relcache. */
1174 76 : if (pubform->puballtables)
1175 : {
1176 13 : CacheInvalidateRelcacheAll();
1177 : }
1178 : else
1179 : {
1180 63 : List *relids = NIL;
1181 63 : List *schemarelids = NIL;
1182 :
1183 : /*
1184 : * For any partitioned tables contained in the publication, we must
1185 : * invalidate all partitions contained in the respective partition
1186 : * trees, not just those explicitly mentioned in the publication.
1187 : */
1188 63 : if (root_relids == NIL)
1189 39 : relids = GetIncludedPublicationRelations(pubform->oid,
1190 : PUBLICATION_PART_ALL);
1191 : else
1192 : {
1193 : /*
1194 : * We already got tables explicitly mentioned in the publication.
1195 : * Now get all partitions for the partitioned table in the list.
1196 : */
1197 48 : foreach(lc, root_relids)
1198 24 : relids = GetPubPartitionOptionRelations(relids,
1199 : PUBLICATION_PART_ALL,
1200 : lfirst_oid(lc));
1201 : }
1202 :
1203 63 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1204 : PUBLICATION_PART_ALL);
1205 63 : relids = list_concat_unique_oid(relids, schemarelids);
1206 :
1207 63 : InvalidatePublicationRels(relids);
1208 : }
1209 :
1210 76 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1211 76 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1212 : (Node *) stmt);
1213 :
1214 76 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1215 76 : }
1216 :
1217 : /*
1218 : * Invalidate the relations.
1219 : */
1220 : void
1221 1600 : InvalidatePublicationRels(List *relids)
1222 : {
1223 : /*
1224 : * We don't want to send too many individual messages, at some point it's
1225 : * cheaper to just reset whole relcache.
1226 : */
1227 1600 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1228 : {
1229 : ListCell *lc;
1230 :
1231 13707 : foreach(lc, relids)
1232 12107 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1233 : }
1234 : else
1235 0 : CacheInvalidateRelcacheAll();
1236 1600 : }
1237 :
1238 : /*
1239 : * Add or remove table to/from publication.
1240 : */
1241 : static void
1242 617 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1243 : List *tables, const char *queryString,
1244 : bool publish_schema)
1245 : {
1246 617 : List *rels = NIL;
1247 617 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1248 617 : Oid pubid = pubform->oid;
1249 :
1250 : /*
1251 : * Nothing to do if no objects, except in SET: for that it is quite
1252 : * possible that user has not specified any tables in which case we need
1253 : * to remove all the existing tables.
1254 : */
1255 617 : if (!tables && stmt->action != AP_SetObjects)
1256 48 : return;
1257 :
1258 569 : rels = OpenTableList(tables);
1259 :
1260 569 : if (stmt->action == AP_AddObjects)
1261 : {
1262 186 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1263 :
1264 174 : publish_schema |= is_schema_publication(pubid);
1265 :
1266 174 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1267 174 : pubform->pubviaroot);
1268 :
1269 166 : PublicationAddTables(pubid, rels, false, stmt);
1270 : }
1271 383 : else if (stmt->action == AP_DropObjects)
1272 66 : PublicationDropTables(pubid, rels, false);
1273 : else /* AP_SetObjects */
1274 : {
1275 317 : List *oldrelids = NIL;
1276 317 : List *delrels = NIL;
1277 : ListCell *oldlc;
1278 :
1279 317 : if (stmt->for_all_tables || stmt->for_all_sequences)
1280 : {
1281 : /*
1282 : * In FOR ALL TABLES mode, relations are tracked as exclusions
1283 : * (EXCEPT TABLES). Fetch the current excluded relations so they
1284 : * can be reconciled with the specified EXCEPT list.
1285 : *
1286 : * This applies only if the existing publication is already
1287 : * defined as FOR ALL TABLES; otherwise, there are no exclusion
1288 : * entries to process.
1289 : */
1290 21 : if (pubform->puballtables)
1291 : {
1292 17 : oldrelids = GetExcludedPublicationTables(pubid,
1293 : PUBLICATION_PART_ROOT);
1294 : }
1295 : }
1296 : else
1297 : {
1298 296 : oldrelids = GetIncludedPublicationRelations(pubid,
1299 : PUBLICATION_PART_ROOT);
1300 :
1301 296 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1302 :
1303 284 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1304 284 : pubform->pubviaroot);
1305 : }
1306 :
1307 : /*
1308 : * To recreate the relation list for the publication, look for
1309 : * existing relations that do not need to be dropped.
1310 : */
1311 571 : foreach(oldlc, oldrelids)
1312 : {
1313 282 : Oid oldrelid = lfirst_oid(oldlc);
1314 : ListCell *newlc;
1315 : PublicationRelInfo *oldrel;
1316 282 : bool found = false;
1317 : HeapTuple rftuple;
1318 282 : Node *oldrelwhereclause = NULL;
1319 282 : Bitmapset *oldcolumns = NULL;
1320 :
1321 : /* look up the cache for the old relmap */
1322 282 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1323 : ObjectIdGetDatum(oldrelid),
1324 : ObjectIdGetDatum(pubid));
1325 :
1326 : /*
1327 : * See if the existing relation currently has a WHERE clause or a
1328 : * column list. We need to compare those too.
1329 : */
1330 282 : if (HeapTupleIsValid(rftuple))
1331 : {
1332 282 : bool isnull = true;
1333 : Datum whereClauseDatum;
1334 : Datum columnListDatum;
1335 :
1336 : /* Load the WHERE clause for this table. */
1337 282 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1338 : Anum_pg_publication_rel_prqual,
1339 : &isnull);
1340 282 : if (!isnull)
1341 139 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1342 :
1343 : /* Transform the int2vector column list to a bitmap. */
1344 282 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1345 : Anum_pg_publication_rel_prattrs,
1346 : &isnull);
1347 :
1348 282 : if (!isnull)
1349 90 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1350 :
1351 282 : ReleaseSysCache(rftuple);
1352 : }
1353 :
1354 538 : foreach(newlc, rels)
1355 : {
1356 : PublicationRelInfo *newpubrel;
1357 : Oid newrelid;
1358 278 : Bitmapset *newcolumns = NULL;
1359 :
1360 278 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1361 278 : newrelid = RelationGetRelid(newpubrel->relation);
1362 :
1363 : /*
1364 : * Validate the column list. If the column list or WHERE
1365 : * clause changes, then the validation done here will be
1366 : * duplicated inside PublicationAddTables(). The validation
1367 : * is cheap enough that that seems harmless.
1368 : */
1369 278 : newcolumns = pub_collist_validate(newpubrel->relation,
1370 : newpubrel->columns);
1371 :
1372 : /*
1373 : * Check if any of the new set of relations matches with the
1374 : * existing relations in the publication. Additionally, if the
1375 : * relation has an associated WHERE clause, check the WHERE
1376 : * expressions also match. Same for the column list. Drop the
1377 : * rest.
1378 : */
1379 270 : if (newrelid == oldrelid)
1380 : {
1381 194 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1382 56 : bms_equal(oldcolumns, newcolumns))
1383 : {
1384 14 : found = true;
1385 14 : break;
1386 : }
1387 : }
1388 : }
1389 :
1390 : /*
1391 : * Add the non-matched relations to a list so that they can be
1392 : * dropped.
1393 : */
1394 274 : if (!found)
1395 : {
1396 260 : oldrel = palloc_object(PublicationRelInfo);
1397 260 : oldrel->whereClause = NULL;
1398 260 : oldrel->columns = NIL;
1399 260 : oldrel->except = false;
1400 260 : oldrel->relation = table_open(oldrelid,
1401 : ShareUpdateExclusiveLock);
1402 260 : delrels = lappend(delrels, oldrel);
1403 : }
1404 : }
1405 :
1406 : /* And drop them. */
1407 289 : PublicationDropTables(pubid, delrels, true);
1408 :
1409 : /*
1410 : * Don't bother calculating the difference for adding, we'll catch and
1411 : * skip existing ones when doing catalog update.
1412 : */
1413 289 : PublicationAddTables(pubid, rels, true, stmt);
1414 :
1415 289 : CloseTableList(delrels);
1416 : }
1417 :
1418 481 : CloseTableList(rels);
1419 : }
1420 :
1421 : /*
1422 : * Alter the publication schemas.
1423 : *
1424 : * Add or remove schemas to/from publication.
1425 : */
1426 : static void
1427 529 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1428 : HeapTuple tup, List *schemaidlist)
1429 : {
1430 529 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1431 :
1432 : /*
1433 : * Nothing to do if no objects, except in SET: for that it is quite
1434 : * possible that user has not specified any schemas in which case we need
1435 : * to remove all the existing schemas.
1436 : */
1437 529 : if (!schemaidlist && stmt->action != AP_SetObjects)
1438 192 : return;
1439 :
1440 : /*
1441 : * Schema lock is held until the publication is altered to prevent
1442 : * concurrent schema deletion.
1443 : */
1444 337 : LockSchemaList(schemaidlist);
1445 337 : if (stmt->action == AP_AddObjects)
1446 : {
1447 : ListCell *lc;
1448 : List *reloids;
1449 :
1450 23 : reloids = GetIncludedPublicationRelations(pubform->oid,
1451 : PUBLICATION_PART_ROOT);
1452 :
1453 32 : foreach(lc, reloids)
1454 : {
1455 : HeapTuple coltuple;
1456 :
1457 13 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1458 : ObjectIdGetDatum(lfirst_oid(lc)),
1459 : ObjectIdGetDatum(pubform->oid));
1460 :
1461 13 : if (!HeapTupleIsValid(coltuple))
1462 0 : continue;
1463 :
1464 : /*
1465 : * Disallow adding schema if column list is already part of the
1466 : * publication. See CheckPubRelationColumnList.
1467 : */
1468 13 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1469 4 : ereport(ERROR,
1470 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1471 : errmsg("cannot add schema to publication \"%s\"",
1472 : stmt->pubname),
1473 : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1474 :
1475 9 : ReleaseSysCache(coltuple);
1476 : }
1477 :
1478 19 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1479 : }
1480 314 : else if (stmt->action == AP_DropObjects)
1481 25 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1482 : else /* AP_SetObjects */
1483 : {
1484 289 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1485 289 : List *delschemas = NIL;
1486 :
1487 : /* Identify which schemas should be dropped */
1488 289 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1489 :
1490 : /*
1491 : * Schema lock is held until the publication is altered to prevent
1492 : * concurrent schema deletion.
1493 : */
1494 289 : LockSchemaList(delschemas);
1495 :
1496 : /* And drop them */
1497 289 : PublicationDropSchemas(pubform->oid, delschemas, true);
1498 :
1499 : /*
1500 : * Don't bother calculating the difference for adding, we'll catch and
1501 : * skip existing ones when doing catalog update.
1502 : */
1503 289 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1504 : }
1505 : }
1506 :
1507 : /*
1508 : * Check if relations and schemas can be in a given publication and throw
1509 : * appropriate error if not.
1510 : */
1511 : static void
1512 681 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1513 : List *tables, List *schemaidlist)
1514 : {
1515 681 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1516 :
1517 681 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1518 67 : schemaidlist && !superuser())
1519 4 : ereport(ERROR,
1520 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1521 : errmsg("must be superuser to add or set schemas")));
1522 :
1523 677 : if (stmt->for_all_tables && !superuser())
1524 8 : ereport(ERROR,
1525 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1526 : errmsg("must be superuser to set ALL TABLES"));
1527 :
1528 669 : if (stmt->for_all_sequences && !superuser())
1529 4 : ereport(ERROR,
1530 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1531 : errmsg("must be superuser to set ALL SEQUENCES"));
1532 :
1533 : /*
1534 : * Check that user is allowed to manipulate the publication tables in
1535 : * schema
1536 : */
1537 665 : if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
1538 : {
1539 12 : if (pubform->puballtables && pubform->puballsequences)
1540 0 : ereport(ERROR,
1541 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1542 : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1543 : NameStr(pubform->pubname)),
1544 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1545 12 : else if (pubform->puballtables)
1546 12 : ereport(ERROR,
1547 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1548 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1549 : NameStr(pubform->pubname)),
1550 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
1551 : else
1552 0 : ereport(ERROR,
1553 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1554 : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1555 : NameStr(pubform->pubname)),
1556 : errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1557 : }
1558 :
1559 : /* Check that user is allowed to manipulate the publication tables. */
1560 653 : if (tables && (pubform->puballtables || pubform->puballsequences))
1561 : {
1562 12 : if (pubform->puballtables && pubform->puballsequences)
1563 0 : ereport(ERROR,
1564 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1565 : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1566 : NameStr(pubform->pubname)),
1567 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1568 12 : else if (pubform->puballtables)
1569 12 : ereport(ERROR,
1570 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1571 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1572 : NameStr(pubform->pubname)),
1573 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
1574 : else
1575 0 : ereport(ERROR,
1576 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1577 : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1578 : NameStr(pubform->pubname)),
1579 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1580 : }
1581 :
1582 641 : if (stmt->for_all_tables || stmt->for_all_sequences)
1583 : {
1584 : /*
1585 : * If the publication already contains specific tables or schemas, we
1586 : * prevent switching to a ALL state.
1587 : */
1588 78 : if (is_table_publication(pubform->oid) ||
1589 33 : is_schema_publication(pubform->oid))
1590 : {
1591 24 : ereport(ERROR,
1592 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1593 : stmt->for_all_tables ?
1594 : errmsg("publication \"%s\" does not support ALL TABLES operations", NameStr(pubform->pubname)) :
1595 : errmsg("publication \"%s\" does not support ALL SEQUENCES operations", NameStr(pubform->pubname)),
1596 : errdetail("This operation requires the publication to be defined as FOR ALL TABLES/SEQUENCES or to be empty."));
1597 : }
1598 : }
1599 617 : }
1600 :
1601 : /*
1602 : * Update FOR ALL TABLES / FOR ALL SEQUENCES flags of a publication.
1603 : */
1604 : static void
1605 517 : AlterPublicationAllFlags(AlterPublicationStmt *stmt, Relation rel,
1606 : HeapTuple tup)
1607 : {
1608 : Form_pg_publication pubform;
1609 517 : bool nulls[Natts_pg_publication] = {0};
1610 517 : bool replaces[Natts_pg_publication] = {0};
1611 517 : Datum values[Natts_pg_publication] = {0};
1612 517 : bool dirty = false;
1613 :
1614 517 : if (!stmt->for_all_tables && !stmt->for_all_sequences)
1615 496 : return;
1616 :
1617 21 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1618 :
1619 : /* Update FOR ALL TABLES flag if changed */
1620 21 : if (stmt->for_all_tables != pubform->puballtables)
1621 : {
1622 8 : values[Anum_pg_publication_puballtables - 1] =
1623 8 : BoolGetDatum(stmt->for_all_tables);
1624 8 : replaces[Anum_pg_publication_puballtables - 1] = true;
1625 8 : dirty = true;
1626 : }
1627 :
1628 : /* Update FOR ALL SEQUENCES flag if changed */
1629 21 : if (stmt->for_all_sequences != pubform->puballsequences)
1630 : {
1631 12 : values[Anum_pg_publication_puballsequences - 1] =
1632 12 : BoolGetDatum(stmt->for_all_sequences);
1633 12 : replaces[Anum_pg_publication_puballsequences - 1] = true;
1634 12 : dirty = true;
1635 : }
1636 :
1637 21 : if (dirty)
1638 : {
1639 12 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values,
1640 : nulls, replaces);
1641 12 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1642 12 : CommandCounterIncrement();
1643 :
1644 : /* For ALL TABLES, we must invalidate all relcache entries */
1645 12 : if (replaces[Anum_pg_publication_puballtables - 1])
1646 8 : CacheInvalidateRelcacheAll();
1647 : }
1648 : }
1649 :
1650 : /*
1651 : * Alter the existing publication.
1652 : *
1653 : * This is dispatcher function for AlterPublicationOptions,
1654 : * AlterPublicationSchemas and AlterPublicationTables.
1655 : */
1656 : void
1657 777 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1658 : {
1659 : Relation rel;
1660 : HeapTuple tup;
1661 : Form_pg_publication pubform;
1662 :
1663 777 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1664 :
1665 777 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1666 : CStringGetDatum(stmt->pubname));
1667 :
1668 777 : if (!HeapTupleIsValid(tup))
1669 0 : ereport(ERROR,
1670 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1671 : errmsg("publication \"%s\" does not exist",
1672 : stmt->pubname)));
1673 :
1674 777 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1675 :
1676 : /* must be owner */
1677 777 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1678 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1679 0 : stmt->pubname);
1680 :
1681 777 : if (stmt->options)
1682 84 : AlterPublicationOptions(pstate, stmt, rel, tup);
1683 : else
1684 : {
1685 693 : List *relations = NIL;
1686 693 : List *exceptrelations = NIL;
1687 693 : List *schemaidlist = NIL;
1688 693 : Oid pubid = pubform->oid;
1689 :
1690 693 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1691 : &exceptrelations, &schemaidlist);
1692 :
1693 681 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1694 :
1695 617 : heap_freetuple(tup);
1696 :
1697 : /* Lock the publication so nobody else can do anything with it. */
1698 617 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1699 : AccessExclusiveLock);
1700 :
1701 : /*
1702 : * It is possible that by the time we acquire the lock on publication,
1703 : * concurrent DDL has removed it. We can test this by checking the
1704 : * existence of publication. We get the tuple again to avoid the risk
1705 : * of any publication option getting changed.
1706 : */
1707 617 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1708 617 : if (!HeapTupleIsValid(tup))
1709 0 : ereport(ERROR,
1710 : errcode(ERRCODE_UNDEFINED_OBJECT),
1711 : errmsg("publication \"%s\" does not exist",
1712 : stmt->pubname));
1713 :
1714 617 : relations = list_concat(relations, exceptrelations);
1715 617 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1716 : schemaidlist != NIL);
1717 529 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1718 517 : AlterPublicationAllFlags(stmt, rel, tup);
1719 : }
1720 :
1721 : /* Cleanup. */
1722 593 : heap_freetuple(tup);
1723 593 : table_close(rel, RowExclusiveLock);
1724 593 : }
1725 :
1726 : /*
1727 : * Remove relation from publication by mapping OID.
1728 : */
1729 : void
1730 589 : RemovePublicationRelById(Oid proid)
1731 : {
1732 : Relation rel;
1733 : HeapTuple tup;
1734 : Form_pg_publication_rel pubrel;
1735 589 : List *relids = NIL;
1736 :
1737 589 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1738 :
1739 589 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1740 :
1741 589 : if (!HeapTupleIsValid(tup))
1742 0 : elog(ERROR, "cache lookup failed for publication table %u",
1743 : proid);
1744 :
1745 589 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1746 :
1747 : /*
1748 : * Invalidate relcache so that publication info is rebuilt.
1749 : *
1750 : * For the partitioned tables, we must invalidate all partitions contained
1751 : * in the respective partition hierarchies, not just the one explicitly
1752 : * mentioned in the publication. This is required because we implicitly
1753 : * publish the child tables when the parent table is published.
1754 : */
1755 589 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1756 : pubrel->prrelid);
1757 :
1758 589 : InvalidatePublicationRels(relids);
1759 :
1760 589 : CatalogTupleDelete(rel, &tup->t_self);
1761 :
1762 589 : ReleaseSysCache(tup);
1763 :
1764 589 : table_close(rel, RowExclusiveLock);
1765 589 : }
1766 :
1767 : /*
1768 : * Remove the publication by mapping OID.
1769 : */
1770 : void
1771 352 : RemovePublicationById(Oid pubid)
1772 : {
1773 : Relation rel;
1774 : HeapTuple tup;
1775 : Form_pg_publication pubform;
1776 :
1777 352 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1778 :
1779 352 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1780 352 : if (!HeapTupleIsValid(tup))
1781 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1782 :
1783 352 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1784 :
1785 : /* Invalidate relcache so that publication info is rebuilt. */
1786 352 : if (pubform->puballtables)
1787 75 : CacheInvalidateRelcacheAll();
1788 :
1789 352 : CatalogTupleDelete(rel, &tup->t_self);
1790 :
1791 352 : ReleaseSysCache(tup);
1792 :
1793 352 : table_close(rel, RowExclusiveLock);
1794 352 : }
1795 :
1796 : /*
1797 : * Remove schema from publication by mapping OID.
1798 : */
1799 : void
1800 127 : RemovePublicationSchemaById(Oid psoid)
1801 : {
1802 : Relation rel;
1803 : HeapTuple tup;
1804 127 : List *schemaRels = NIL;
1805 : Form_pg_publication_namespace pubsch;
1806 :
1807 127 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1808 :
1809 127 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1810 :
1811 127 : if (!HeapTupleIsValid(tup))
1812 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1813 :
1814 127 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1815 :
1816 : /*
1817 : * Invalidate relcache so that publication info is rebuilt. See
1818 : * RemovePublicationRelById for why we need to consider all the
1819 : * partitions.
1820 : */
1821 127 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1822 : PUBLICATION_PART_ALL);
1823 127 : InvalidatePublicationRels(schemaRels);
1824 :
1825 127 : CatalogTupleDelete(rel, &tup->t_self);
1826 :
1827 127 : ReleaseSysCache(tup);
1828 :
1829 127 : table_close(rel, RowExclusiveLock);
1830 127 : }
1831 :
1832 : /*
1833 : * Open relations specified by a PublicationTable list.
1834 : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1835 : * add them to a publication.
1836 : */
1837 : static List *
1838 889 : OpenTableList(List *tables)
1839 : {
1840 889 : List *relids = NIL;
1841 889 : List *rels = NIL;
1842 : ListCell *lc;
1843 889 : List *relids_with_rf = NIL;
1844 889 : List *relids_with_collist = NIL;
1845 :
1846 : /*
1847 : * Open, share-lock, and check all the explicitly-specified relations
1848 : */
1849 1805 : foreach(lc, tables)
1850 : {
1851 936 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1852 936 : bool recurse = t->relation->inh;
1853 : Relation rel;
1854 : Oid myrelid;
1855 : PublicationRelInfo *pub_rel;
1856 :
1857 : /* Allow query cancel in case this takes a long time */
1858 936 : CHECK_FOR_INTERRUPTS();
1859 :
1860 936 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1861 932 : myrelid = RelationGetRelid(rel);
1862 :
1863 : /*
1864 : * Filter out duplicates if user specifies "foo, foo".
1865 : *
1866 : * Note that this algorithm is known to not be very efficient (O(N^2))
1867 : * but given that it only works on list of tables given to us by user
1868 : * it's deemed acceptable.
1869 : */
1870 932 : if (list_member_oid(relids, myrelid))
1871 : {
1872 : /* Disallow duplicate tables if there are any with row filters. */
1873 16 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1874 8 : ereport(ERROR,
1875 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1876 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1877 : RelationGetRelationName(rel))));
1878 :
1879 : /* Disallow duplicate tables if there are any with column lists. */
1880 8 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1881 8 : ereport(ERROR,
1882 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1883 : errmsg("conflicting or redundant column lists for table \"%s\"",
1884 : RelationGetRelationName(rel))));
1885 :
1886 0 : table_close(rel, ShareUpdateExclusiveLock);
1887 0 : continue;
1888 : }
1889 :
1890 916 : pub_rel = palloc_object(PublicationRelInfo);
1891 916 : pub_rel->relation = rel;
1892 916 : pub_rel->whereClause = t->whereClause;
1893 916 : pub_rel->columns = t->columns;
1894 916 : pub_rel->except = t->except;
1895 916 : rels = lappend(rels, pub_rel);
1896 916 : relids = lappend_oid(relids, myrelid);
1897 :
1898 916 : if (t->whereClause)
1899 260 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1900 :
1901 916 : if (t->columns)
1902 260 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1903 :
1904 : /*
1905 : * Add children of this rel, if requested, so that they too are added
1906 : * to the publication. A partitioned table can't have any inheritance
1907 : * children other than its partitions, which need not be explicitly
1908 : * added to the publication.
1909 : */
1910 916 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1911 : {
1912 : List *children;
1913 : ListCell *child;
1914 :
1915 764 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1916 : NULL);
1917 :
1918 1543 : foreach(child, children)
1919 : {
1920 779 : Oid childrelid = lfirst_oid(child);
1921 :
1922 : /* Allow query cancel in case this takes a long time */
1923 779 : CHECK_FOR_INTERRUPTS();
1924 :
1925 : /*
1926 : * Skip duplicates if user specified both parent and child
1927 : * tables.
1928 : */
1929 779 : if (list_member_oid(relids, childrelid))
1930 : {
1931 : /*
1932 : * We don't allow to specify row filter for both parent
1933 : * and child table at the same time as it is not very
1934 : * clear which one should be given preference.
1935 : */
1936 764 : if (childrelid != myrelid &&
1937 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1938 0 : ereport(ERROR,
1939 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1940 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1941 : RelationGetRelationName(rel))));
1942 :
1943 : /*
1944 : * We don't allow to specify column list for both parent
1945 : * and child table at the same time as it is not very
1946 : * clear which one should be given preference.
1947 : */
1948 764 : if (childrelid != myrelid &&
1949 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1950 0 : ereport(ERROR,
1951 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1952 : errmsg("conflicting or redundant column lists for table \"%s\"",
1953 : RelationGetRelationName(rel))));
1954 :
1955 764 : continue;
1956 : }
1957 :
1958 : /* find_all_inheritors already got lock */
1959 15 : rel = table_open(childrelid, NoLock);
1960 15 : pub_rel = palloc_object(PublicationRelInfo);
1961 15 : pub_rel->relation = rel;
1962 : /* child inherits WHERE clause from parent */
1963 15 : pub_rel->whereClause = t->whereClause;
1964 :
1965 : /* child inherits column list from parent */
1966 15 : pub_rel->columns = t->columns;
1967 15 : pub_rel->except = t->except;
1968 15 : rels = lappend(rels, pub_rel);
1969 15 : relids = lappend_oid(relids, childrelid);
1970 :
1971 15 : if (t->whereClause)
1972 1 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1973 :
1974 15 : if (t->columns)
1975 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1976 : }
1977 : }
1978 : }
1979 :
1980 869 : list_free(relids);
1981 869 : list_free(relids_with_rf);
1982 :
1983 869 : return rels;
1984 : }
1985 :
1986 : /*
1987 : * Close all relations in the list.
1988 : */
1989 : static void
1990 1029 : CloseTableList(List *rels)
1991 : {
1992 : ListCell *lc;
1993 :
1994 2075 : foreach(lc, rels)
1995 : {
1996 : PublicationRelInfo *pub_rel;
1997 :
1998 1046 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1999 1046 : table_close(pub_rel->relation, NoLock);
2000 : }
2001 :
2002 1029 : list_free_deep(rels);
2003 1029 : }
2004 :
2005 : /*
2006 : * Lock the schemas specified in the schema list in AccessShareLock mode in
2007 : * order to prevent concurrent schema deletion.
2008 : */
2009 : static void
2010 726 : LockSchemaList(List *schemalist)
2011 : {
2012 : ListCell *lc;
2013 :
2014 938 : foreach(lc, schemalist)
2015 : {
2016 212 : Oid schemaid = lfirst_oid(lc);
2017 :
2018 : /* Allow query cancel in case this takes a long time */
2019 212 : CHECK_FOR_INTERRUPTS();
2020 212 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
2021 :
2022 : /*
2023 : * It is possible that by the time we acquire the lock on schema,
2024 : * concurrent DDL has removed it. We can test this by checking the
2025 : * existence of schema.
2026 : */
2027 212 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
2028 0 : ereport(ERROR,
2029 : errcode(ERRCODE_UNDEFINED_SCHEMA),
2030 : errmsg("schema with OID %u does not exist", schemaid));
2031 : }
2032 726 : }
2033 :
2034 : /*
2035 : * Add listed tables to the publication.
2036 : */
2037 : static void
2038 735 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
2039 : AlterPublicationStmt *stmt)
2040 : {
2041 : ListCell *lc;
2042 :
2043 1463 : foreach(lc, rels)
2044 : {
2045 777 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
2046 777 : Relation rel = pub_rel->relation;
2047 : ObjectAddress obj;
2048 :
2049 : /* Must be owner of the table or superuser. */
2050 777 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
2051 4 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
2052 4 : RelationGetRelationName(rel));
2053 :
2054 773 : obj = publication_add_relation(pubid, pub_rel, if_not_exists, stmt);
2055 728 : if (stmt)
2056 : {
2057 407 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
2058 : (Node *) stmt);
2059 :
2060 407 : InvokeObjectPostCreateHook(PublicationRelRelationId,
2061 : obj.objectId, 0);
2062 : }
2063 : }
2064 686 : }
2065 :
2066 : /*
2067 : * Remove listed tables from the publication.
2068 : */
2069 : static void
2070 355 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
2071 : {
2072 : ObjectAddress obj;
2073 : ListCell *lc;
2074 : Oid prid;
2075 :
2076 673 : foreach(lc, rels)
2077 : {
2078 330 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
2079 330 : Relation rel = pubrel->relation;
2080 330 : Oid relid = RelationGetRelid(rel);
2081 :
2082 330 : if (pubrel->columns)
2083 0 : ereport(ERROR,
2084 : errcode(ERRCODE_SYNTAX_ERROR),
2085 : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
2086 :
2087 330 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
2088 : ObjectIdGetDatum(relid),
2089 : ObjectIdGetDatum(pubid));
2090 330 : if (!OidIsValid(prid))
2091 : {
2092 8 : if (missing_ok)
2093 0 : continue;
2094 :
2095 8 : ereport(ERROR,
2096 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2097 : errmsg("relation \"%s\" is not part of the publication",
2098 : RelationGetRelationName(rel))));
2099 : }
2100 :
2101 322 : if (pubrel->whereClause)
2102 4 : ereport(ERROR,
2103 : (errcode(ERRCODE_SYNTAX_ERROR),
2104 : errmsg("cannot use a WHERE clause when removing a table from a publication")));
2105 :
2106 318 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
2107 318 : performDeletion(&obj, DROP_CASCADE, 0);
2108 : }
2109 343 : }
2110 :
2111 : /*
2112 : * Add listed schemas to the publication.
2113 : */
2114 : static void
2115 408 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
2116 : AlterPublicationStmt *stmt)
2117 : {
2118 : ListCell *lc;
2119 :
2120 571 : foreach(lc, schemas)
2121 : {
2122 171 : Oid schemaid = lfirst_oid(lc);
2123 : ObjectAddress obj;
2124 :
2125 171 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
2126 163 : if (stmt)
2127 : {
2128 43 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
2129 : (Node *) stmt);
2130 :
2131 43 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
2132 : obj.objectId, 0);
2133 : }
2134 : }
2135 400 : }
2136 :
2137 : /*
2138 : * Remove listed schemas from the publication.
2139 : */
2140 : static void
2141 314 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
2142 : {
2143 : ObjectAddress obj;
2144 : ListCell *lc;
2145 : Oid psid;
2146 :
2147 347 : foreach(lc, schemas)
2148 : {
2149 37 : Oid schemaid = lfirst_oid(lc);
2150 :
2151 37 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
2152 : Anum_pg_publication_namespace_oid,
2153 : ObjectIdGetDatum(schemaid),
2154 : ObjectIdGetDatum(pubid));
2155 37 : if (!OidIsValid(psid))
2156 : {
2157 4 : if (missing_ok)
2158 0 : continue;
2159 :
2160 4 : ereport(ERROR,
2161 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2162 : errmsg("tables from schema \"%s\" are not part of the publication",
2163 : get_namespace_name(schemaid))));
2164 : }
2165 :
2166 33 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
2167 33 : performDeletion(&obj, DROP_CASCADE, 0);
2168 : }
2169 310 : }
2170 :
2171 : /*
2172 : * Internal workhorse for changing a publication owner
2173 : */
2174 : static void
2175 26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2176 : {
2177 : Form_pg_publication form;
2178 :
2179 26 : form = (Form_pg_publication) GETSTRUCT(tup);
2180 :
2181 26 : if (form->pubowner == newOwnerId)
2182 6 : return;
2183 :
2184 20 : if (!superuser())
2185 : {
2186 : AclResult aclresult;
2187 :
2188 : /* Must be owner */
2189 8 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2190 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2191 0 : NameStr(form->pubname));
2192 :
2193 : /* Must be able to become new owner */
2194 8 : check_can_set_role(GetUserId(), newOwnerId);
2195 :
2196 : /* New owner must have CREATE privilege on database */
2197 8 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2198 8 : if (aclresult != ACLCHECK_OK)
2199 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2200 0 : get_database_name(MyDatabaseId));
2201 :
2202 8 : if (!superuser_arg(newOwnerId))
2203 : {
2204 8 : if (form->puballtables || form->puballsequences ||
2205 4 : is_schema_publication(form->oid))
2206 4 : ereport(ERROR,
2207 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2208 : errmsg("permission denied to change owner of publication \"%s\"",
2209 : NameStr(form->pubname)),
2210 : errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
2211 : }
2212 : }
2213 :
2214 16 : form->pubowner = newOwnerId;
2215 16 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2216 :
2217 : /* Update owner dependency reference */
2218 16 : changeDependencyOnOwner(PublicationRelationId,
2219 : form->oid,
2220 : newOwnerId);
2221 :
2222 16 : InvokeObjectPostAlterHook(PublicationRelationId,
2223 : form->oid, 0);
2224 : }
2225 :
2226 : /*
2227 : * Change publication owner -- by name
2228 : */
2229 : ObjectAddress
2230 26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
2231 : {
2232 : Oid pubid;
2233 : HeapTuple tup;
2234 : Relation rel;
2235 : ObjectAddress address;
2236 : Form_pg_publication pubform;
2237 :
2238 26 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2239 :
2240 26 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2241 :
2242 26 : if (!HeapTupleIsValid(tup))
2243 0 : ereport(ERROR,
2244 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2245 : errmsg("publication \"%s\" does not exist", name)));
2246 :
2247 26 : pubform = (Form_pg_publication) GETSTRUCT(tup);
2248 26 : pubid = pubform->oid;
2249 :
2250 26 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2251 :
2252 22 : ObjectAddressSet(address, PublicationRelationId, pubid);
2253 :
2254 22 : heap_freetuple(tup);
2255 :
2256 22 : table_close(rel, RowExclusiveLock);
2257 :
2258 22 : return address;
2259 : }
2260 :
2261 : /*
2262 : * Change publication owner -- by OID
2263 : */
2264 : void
2265 0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
2266 : {
2267 : HeapTuple tup;
2268 : Relation rel;
2269 :
2270 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2271 :
2272 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2273 :
2274 0 : if (!HeapTupleIsValid(tup))
2275 0 : ereport(ERROR,
2276 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2277 : errmsg("publication with OID %u does not exist", pubid)));
2278 :
2279 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2280 :
2281 0 : heap_freetuple(tup);
2282 :
2283 0 : table_close(rel, RowExclusiveLock);
2284 0 : }
2285 :
2286 : /*
2287 : * Extract the publish_generated_columns option value from a DefElem. "stored"
2288 : * and "none" values are accepted.
2289 : */
2290 : static char
2291 49 : defGetGeneratedColsOption(DefElem *def)
2292 : {
2293 49 : char *sval = "";
2294 :
2295 : /*
2296 : * A parameter value is required.
2297 : */
2298 49 : if (def->arg)
2299 : {
2300 45 : sval = defGetString(def);
2301 :
2302 45 : if (pg_strcasecmp(sval, "none") == 0)
2303 14 : return PUBLISH_GENCOLS_NONE;
2304 31 : if (pg_strcasecmp(sval, "stored") == 0)
2305 27 : return PUBLISH_GENCOLS_STORED;
2306 : }
2307 :
2308 8 : ereport(ERROR,
2309 : errcode(ERRCODE_SYNTAX_ERROR),
2310 : errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
2311 : errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
2312 :
2313 : return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2314 : }
|