Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/indexing.h"
22 : #include "catalog/namespace.h"
23 : #include "catalog/objectaccess.h"
24 : #include "catalog/objectaddress.h"
25 : #include "catalog/pg_database.h"
26 : #include "catalog/pg_inherits.h"
27 : #include "catalog/pg_namespace.h"
28 : #include "catalog/pg_proc.h"
29 : #include "catalog/pg_publication.h"
30 : #include "catalog/pg_publication_namespace.h"
31 : #include "catalog/pg_publication_rel.h"
32 : #include "commands/defrem.h"
33 : #include "commands/event_trigger.h"
34 : #include "commands/publicationcmds.h"
35 : #include "miscadmin.h"
36 : #include "nodes/nodeFuncs.h"
37 : #include "parser/parse_clause.h"
38 : #include "parser/parse_collate.h"
39 : #include "parser/parse_relation.h"
40 : #include "rewrite/rewriteHandler.h"
41 : #include "storage/lmgr.h"
42 : #include "utils/acl.h"
43 : #include "utils/builtins.h"
44 : #include "utils/inval.h"
45 : #include "utils/lsyscache.h"
46 : #include "utils/rel.h"
47 : #include "utils/syscache.h"
48 : #include "utils/varlena.h"
49 :
50 :
51 : /*
52 : * Information used to validate the columns in the row filter expression. See
53 : * contain_invalid_rfcolumn_walker for details.
54 : */
55 : typedef struct rf_context
56 : {
57 : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : bool pubviaroot; /* true if we are validating the parent
59 : * relation's row filter */
60 : Oid relid; /* relid of the relation */
61 : Oid parentid; /* relid of the parent relation */
62 : } rf_context;
63 :
64 : static List *OpenTableList(List *tables);
65 : static void CloseTableList(List *rels);
66 : static void LockSchemaList(List *schemalist);
67 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : AlterPublicationStmt *stmt);
69 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : AlterPublicationStmt *stmt);
72 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 : static char defGetGeneratedColsOption(DefElem *def);
74 :
75 :
76 : static void
77 1012 : parse_publication_options(ParseState *pstate,
78 : List *options,
79 : bool *publish_given,
80 : PublicationActions *pubactions,
81 : bool *publish_via_partition_root_given,
82 : bool *publish_via_partition_root,
83 : bool *publish_generated_columns_given,
84 : char *publish_generated_columns)
85 : {
86 : ListCell *lc;
87 :
88 1012 : *publish_given = false;
89 1012 : *publish_via_partition_root_given = false;
90 1012 : *publish_generated_columns_given = false;
91 :
92 : /* defaults */
93 1012 : pubactions->pubinsert = true;
94 1012 : pubactions->pubupdate = true;
95 1012 : pubactions->pubdelete = true;
96 1012 : pubactions->pubtruncate = true;
97 1012 : *publish_via_partition_root = false;
98 1012 : *publish_generated_columns = PUBLISH_GENCOLS_NONE;
99 :
100 : /* Parse options */
101 1376 : foreach(lc, options)
102 : {
103 400 : DefElem *defel = (DefElem *) lfirst(lc);
104 :
105 400 : if (strcmp(defel->defname, "publish") == 0)
106 : {
107 : char *publish;
108 : List *publish_list;
109 : ListCell *lc2;
110 :
111 140 : if (*publish_given)
112 0 : errorConflictingDefElem(defel, pstate);
113 :
114 : /*
115 : * If publish option was given only the explicitly listed actions
116 : * should be published.
117 : */
118 140 : pubactions->pubinsert = false;
119 140 : pubactions->pubupdate = false;
120 140 : pubactions->pubdelete = false;
121 140 : pubactions->pubtruncate = false;
122 :
123 140 : *publish_given = true;
124 140 : publish = defGetString(defel);
125 :
126 140 : if (!SplitIdentifierString(publish, ',', &publish_list))
127 0 : ereport(ERROR,
128 : (errcode(ERRCODE_SYNTAX_ERROR),
129 : errmsg("invalid list syntax in parameter \"%s\"",
130 : "publish")));
131 :
132 : /* Process the option list. */
133 334 : foreach(lc2, publish_list)
134 : {
135 200 : char *publish_opt = (char *) lfirst(lc2);
136 :
137 200 : if (strcmp(publish_opt, "insert") == 0)
138 124 : pubactions->pubinsert = true;
139 76 : else if (strcmp(publish_opt, "update") == 0)
140 30 : pubactions->pubupdate = true;
141 46 : else if (strcmp(publish_opt, "delete") == 0)
142 20 : pubactions->pubdelete = true;
143 26 : else if (strcmp(publish_opt, "truncate") == 0)
144 20 : pubactions->pubtruncate = true;
145 : else
146 6 : ereport(ERROR,
147 : (errcode(ERRCODE_SYNTAX_ERROR),
148 : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
149 : "publish", publish_opt)));
150 : }
151 : }
152 260 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
153 : {
154 172 : if (*publish_via_partition_root_given)
155 6 : errorConflictingDefElem(defel, pstate);
156 166 : *publish_via_partition_root_given = true;
157 166 : *publish_via_partition_root = defGetBoolean(defel);
158 : }
159 88 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
160 : {
161 82 : if (*publish_generated_columns_given)
162 6 : errorConflictingDefElem(defel, pstate);
163 76 : *publish_generated_columns_given = true;
164 76 : *publish_generated_columns = defGetGeneratedColsOption(defel);
165 : }
166 : else
167 6 : ereport(ERROR,
168 : (errcode(ERRCODE_SYNTAX_ERROR),
169 : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
170 : }
171 976 : }
172 :
173 : /*
174 : * Convert the PublicationObjSpecType list into schema oid list and
175 : * PublicationTable list.
176 : */
177 : static void
178 1700 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
179 : List **rels, List **schemas)
180 : {
181 : ListCell *cell;
182 : PublicationObjSpec *pubobj;
183 :
184 1700 : if (!pubobjspec_list)
185 104 : return;
186 :
187 3388 : foreach(cell, pubobjspec_list)
188 : {
189 : Oid schemaid;
190 : List *search_path;
191 :
192 1828 : pubobj = (PublicationObjSpec *) lfirst(cell);
193 :
194 1828 : switch (pubobj->pubobjtype)
195 : {
196 1432 : case PUBLICATIONOBJ_TABLE:
197 1432 : *rels = lappend(*rels, pubobj->pubtable);
198 1432 : break;
199 372 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
200 372 : schemaid = get_namespace_oid(pubobj->name, false);
201 :
202 : /* Filter out duplicates if user specifies "sch1, sch1" */
203 342 : *schemas = list_append_unique_oid(*schemas, schemaid);
204 342 : break;
205 24 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
206 24 : search_path = fetch_search_path(false);
207 24 : if (search_path == NIL) /* nothing valid in search_path? */
208 6 : ereport(ERROR,
209 : errcode(ERRCODE_UNDEFINED_SCHEMA),
210 : errmsg("no schema has been selected for CURRENT_SCHEMA"));
211 :
212 18 : schemaid = linitial_oid(search_path);
213 18 : list_free(search_path);
214 :
215 : /* Filter out duplicates if user specifies "sch1, sch1" */
216 18 : *schemas = list_append_unique_oid(*schemas, schemaid);
217 18 : break;
218 0 : default:
219 : /* shouldn't happen */
220 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
221 : break;
222 : }
223 : }
224 : }
225 :
226 : /*
227 : * Returns true if any of the columns used in the row filter WHERE expression is
228 : * not part of REPLICA IDENTITY, false otherwise.
229 : */
230 : static bool
231 258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
232 : {
233 258 : if (node == NULL)
234 0 : return false;
235 :
236 258 : if (IsA(node, Var))
237 : {
238 104 : Var *var = (Var *) node;
239 104 : AttrNumber attnum = var->varattno;
240 :
241 : /*
242 : * If pubviaroot is true, we are validating the row filter of the
243 : * parent table, but the bitmap contains the replica identity
244 : * information of the child table. So, get the column number of the
245 : * child table as parent and child column order could be different.
246 : */
247 104 : if (context->pubviaroot)
248 : {
249 16 : char *colname = get_attname(context->parentid, attnum, false);
250 :
251 16 : attnum = get_attnum(context->relid, colname);
252 : }
253 :
254 104 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
255 104 : context->bms_replident))
256 60 : return true;
257 : }
258 :
259 198 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
260 : context);
261 : }
262 :
263 : /*
264 : * Check if all columns referenced in the filter expression are part of the
265 : * REPLICA IDENTITY index or not.
266 : *
267 : * Returns true if any invalid column is found.
268 : */
269 : bool
270 728 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
271 : bool pubviaroot)
272 : {
273 : HeapTuple rftuple;
274 728 : Oid relid = RelationGetRelid(relation);
275 728 : Oid publish_as_relid = RelationGetRelid(relation);
276 728 : bool result = false;
277 : Datum rfdatum;
278 : bool rfisnull;
279 :
280 : /*
281 : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
282 : * allowed in the row filter and we can skip the validation.
283 : */
284 728 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
285 198 : return false;
286 :
287 : /*
288 : * For a partition, if pubviaroot is true, find the topmost ancestor that
289 : * is published via this publication as we need to use its row filter
290 : * expression to filter the partition's changes.
291 : *
292 : * Note that even though the row filter used is for an ancestor, the
293 : * REPLICA IDENTITY used will be for the actual child table.
294 : */
295 530 : if (pubviaroot && relation->rd_rel->relispartition)
296 : {
297 : publish_as_relid
298 108 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
299 :
300 108 : if (!OidIsValid(publish_as_relid))
301 6 : publish_as_relid = relid;
302 : }
303 :
304 530 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
305 : ObjectIdGetDatum(publish_as_relid),
306 : ObjectIdGetDatum(pubid));
307 :
308 530 : if (!HeapTupleIsValid(rftuple))
309 56 : return false;
310 :
311 474 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
312 : Anum_pg_publication_rel_prqual,
313 : &rfisnull);
314 :
315 474 : if (!rfisnull)
316 : {
317 102 : rf_context context = {0};
318 : Node *rfnode;
319 102 : Bitmapset *bms = NULL;
320 :
321 102 : context.pubviaroot = pubviaroot;
322 102 : context.parentid = publish_as_relid;
323 102 : context.relid = relid;
324 :
325 : /* Remember columns that are part of the REPLICA IDENTITY */
326 102 : bms = RelationGetIndexAttrBitmap(relation,
327 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
328 :
329 102 : context.bms_replident = bms;
330 102 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
331 102 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
332 : }
333 :
334 474 : ReleaseSysCache(rftuple);
335 :
336 474 : return result;
337 : }
338 :
339 : /*
340 : * Check for invalid columns in the publication table definition.
341 : *
342 : * This function evaluates two conditions:
343 : *
344 : * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
345 : * by the column list. If any column is missing, *invalid_column_list is set
346 : * to true.
347 : * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
348 : * are published, either by being explicitly named in the column list or, if
349 : * no column list is specified, by setting the option
350 : * publish_generated_columns to stored. If any unpublished
351 : * generated column is found, *invalid_gen_col is set to true.
352 : *
353 : * Returns true if any of the above conditions are not met.
354 : */
355 : bool
356 944 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
357 : bool pubviaroot, char pubgencols_type,
358 : bool *invalid_column_list,
359 : bool *invalid_gen_col)
360 : {
361 944 : Oid relid = RelationGetRelid(relation);
362 944 : Oid publish_as_relid = RelationGetRelid(relation);
363 : Bitmapset *idattrs;
364 944 : Bitmapset *columns = NULL;
365 944 : TupleDesc desc = RelationGetDescr(relation);
366 : Publication *pub;
367 : int x;
368 :
369 944 : *invalid_column_list = false;
370 944 : *invalid_gen_col = false;
371 :
372 : /*
373 : * For a partition, if pubviaroot is true, find the topmost ancestor that
374 : * is published via this publication as we need to use its column list for
375 : * the changes.
376 : *
377 : * Note that even though the column list used is for an ancestor, the
378 : * REPLICA IDENTITY used will be for the actual child table.
379 : */
380 944 : if (pubviaroot && relation->rd_rel->relispartition)
381 : {
382 160 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
383 :
384 160 : if (!OidIsValid(publish_as_relid))
385 36 : publish_as_relid = relid;
386 : }
387 :
388 : /* Fetch the column list */
389 944 : pub = GetPublication(pubid);
390 944 : check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
391 :
392 944 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
393 : {
394 : /* With REPLICA IDENTITY FULL, no column list is allowed. */
395 212 : *invalid_column_list = (columns != NULL);
396 :
397 : /*
398 : * As we don't allow a column list with REPLICA IDENTITY FULL, the
399 : * publish_generated_columns option must be set to stored if the table
400 : * has any stored generated columns.
401 : */
402 212 : if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
403 200 : relation->rd_att->constr &&
404 88 : relation->rd_att->constr->has_generated_stored)
405 6 : *invalid_gen_col = true;
406 :
407 : /*
408 : * Virtual generated columns are currently not supported for logical
409 : * replication at all.
410 : */
411 212 : if (relation->rd_att->constr &&
412 100 : relation->rd_att->constr->has_generated_virtual)
413 12 : *invalid_gen_col = true;
414 :
415 212 : if (*invalid_gen_col && *invalid_column_list)
416 0 : return true;
417 : }
418 :
419 : /* Remember columns that are part of the REPLICA IDENTITY */
420 944 : idattrs = RelationGetIndexAttrBitmap(relation,
421 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
422 :
423 : /*
424 : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
425 : * (to handle system columns the usual way), while column list does not
426 : * use offset, so we can't do bms_is_subset(). Instead, we have to loop
427 : * over the idattrs and check all of them are in the list.
428 : */
429 944 : x = -1;
430 1602 : while ((x = bms_next_member(idattrs, x)) >= 0)
431 : {
432 664 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
433 664 : Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
434 :
435 664 : if (columns == NULL)
436 : {
437 : /*
438 : * The publish_generated_columns option must be set to stored if
439 : * the REPLICA IDENTITY contains any stored generated column.
440 : */
441 454 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
442 : {
443 6 : *invalid_gen_col = true;
444 6 : break;
445 : }
446 :
447 : /*
448 : * The equivalent setting for virtual generated columns does not
449 : * exist yet.
450 : */
451 448 : if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
452 : {
453 0 : *invalid_gen_col = true;
454 0 : break;
455 : }
456 :
457 : /* Skip validating the column list since it is not defined */
458 448 : continue;
459 : }
460 :
461 : /*
462 : * If pubviaroot is true, we are validating the column list of the
463 : * parent table, but the bitmap contains the replica identity
464 : * information of the child table. The parent/child attnums may not
465 : * match, so translate them to the parent - get the attname from the
466 : * child, and look it up in the parent.
467 : */
468 210 : if (pubviaroot)
469 : {
470 : /* attribute name in the child table */
471 80 : char *colname = get_attname(relid, attnum, false);
472 :
473 : /*
474 : * Determine the attnum for the attribute name in parent (we are
475 : * using the column list defined on the parent).
476 : */
477 80 : attnum = get_attnum(publish_as_relid, colname);
478 : }
479 :
480 : /* replica identity column, not covered by the column list */
481 210 : *invalid_column_list |= !bms_is_member(attnum, columns);
482 :
483 210 : if (*invalid_column_list && *invalid_gen_col)
484 0 : break;
485 : }
486 :
487 944 : bms_free(columns);
488 944 : bms_free(idattrs);
489 :
490 944 : return *invalid_column_list || *invalid_gen_col;
491 : }
492 :
493 : /*
494 : * Invalidate entries in the RelationSyncCache for relations included in the
495 : * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
496 : *
497 : * If 'puballtables' is true, invalidate all cache entries.
498 : */
499 : void
500 36 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
501 : {
502 36 : if (puballtables)
503 : {
504 6 : CacheInvalidateRelSyncAll();
505 : }
506 : else
507 : {
508 30 : List *relids = NIL;
509 30 : List *schemarelids = NIL;
510 :
511 : /*
512 : * For partitioned tables, we must invalidate all partitions and
513 : * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
514 : * a target. However, WAL records for TRUNCATE specify both a root and
515 : * its leaves.
516 : */
517 30 : relids = GetPublicationRelations(pubid,
518 : PUBLICATION_PART_ALL);
519 30 : schemarelids = GetAllSchemaPublicationRelations(pubid,
520 : PUBLICATION_PART_ALL);
521 :
522 30 : relids = list_concat_unique_oid(relids, schemarelids);
523 :
524 : /* Invalidate the relsyncache */
525 66 : foreach_oid(relid, relids)
526 6 : CacheInvalidateRelSync(relid);
527 : }
528 :
529 36 : return;
530 : }
531 :
532 : /* check_functions_in_node callback */
533 : static bool
534 436 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
535 : {
536 436 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
537 : func_id >= FirstNormalObjectId);
538 : }
539 :
540 : /*
541 : * The row filter walker checks if the row filter expression is a "simple
542 : * expression".
543 : *
544 : * It allows only simple or compound expressions such as:
545 : * - (Var Op Const)
546 : * - (Var Op Var)
547 : * - (Var Op Const) AND/OR (Var Op Const)
548 : * - etc
549 : * (where Var is a column of the table this filter belongs to)
550 : *
551 : * The simple expression has the following restrictions:
552 : * - User-defined operators are not allowed;
553 : * - User-defined functions are not allowed;
554 : * - User-defined types are not allowed;
555 : * - User-defined collations are not allowed;
556 : * - Non-immutable built-in functions are not allowed;
557 : * - System columns are not allowed.
558 : *
559 : * NOTES
560 : *
561 : * We don't allow user-defined functions/operators/types/collations because
562 : * (a) if a user drops a user-defined object used in a row filter expression or
563 : * if there is any other error while using it, the logical decoding
564 : * infrastructure won't be able to recover from such an error even if the
565 : * object is recreated again because a historic snapshot is used to evaluate
566 : * the row filter;
567 : * (b) a user-defined function can be used to access tables that could have
568 : * unpleasant results because a historic snapshot is used. That's why only
569 : * immutable built-in functions are allowed in row filter expressions.
570 : *
571 : * We don't allow system columns because currently, we don't have that
572 : * information in the tuple passed to downstream. Also, as we don't replicate
573 : * those to subscribers, there doesn't seem to be a need for a filter on those
574 : * columns.
575 : *
576 : * We can allow other node types after more analysis and testing.
577 : */
578 : static bool
579 1440 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
580 : {
581 1440 : char *errdetail_msg = NULL;
582 :
583 1440 : if (node == NULL)
584 6 : return false;
585 :
586 1434 : switch (nodeTag(node))
587 : {
588 388 : case T_Var:
589 : /* System columns are not allowed. */
590 388 : if (((Var *) node)->varattno < InvalidAttrNumber)
591 6 : errdetail_msg = _("System columns are not allowed.");
592 388 : break;
593 392 : case T_OpExpr:
594 : case T_DistinctExpr:
595 : case T_NullIfExpr:
596 : /* OK, except user-defined operators are not allowed. */
597 392 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
598 6 : errdetail_msg = _("User-defined operators are not allowed.");
599 392 : break;
600 6 : case T_ScalarArrayOpExpr:
601 : /* OK, except user-defined operators are not allowed. */
602 6 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
603 0 : errdetail_msg = _("User-defined operators are not allowed.");
604 :
605 : /*
606 : * We don't need to check the hashfuncid and negfuncid of
607 : * ScalarArrayOpExpr as those functions are only built for a
608 : * subquery.
609 : */
610 6 : break;
611 6 : case T_RowCompareExpr:
612 : {
613 : ListCell *opid;
614 :
615 : /* OK, except user-defined operators are not allowed. */
616 18 : foreach(opid, ((RowCompareExpr *) node)->opnos)
617 : {
618 12 : if (lfirst_oid(opid) >= FirstNormalObjectId)
619 : {
620 0 : errdetail_msg = _("User-defined operators are not allowed.");
621 0 : break;
622 : }
623 : }
624 : }
625 6 : break;
626 636 : case T_Const:
627 : case T_FuncExpr:
628 : case T_BoolExpr:
629 : case T_RelabelType:
630 : case T_CollateExpr:
631 : case T_CaseExpr:
632 : case T_CaseTestExpr:
633 : case T_ArrayExpr:
634 : case T_RowExpr:
635 : case T_CoalesceExpr:
636 : case T_MinMaxExpr:
637 : case T_XmlExpr:
638 : case T_NullTest:
639 : case T_BooleanTest:
640 : case T_List:
641 : /* OK, supported */
642 636 : break;
643 6 : default:
644 6 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
645 6 : break;
646 : }
647 :
648 : /*
649 : * For all the supported nodes, if we haven't already found a problem,
650 : * check the types, functions, and collations used in it. We check List
651 : * by walking through each element.
652 : */
653 1434 : if (!errdetail_msg && !IsA(node, List))
654 : {
655 1362 : if (exprType(node) >= FirstNormalObjectId)
656 6 : errdetail_msg = _("User-defined types are not allowed.");
657 1356 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
658 : pstate))
659 12 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
660 2688 : else if (exprCollation(node) >= FirstNormalObjectId ||
661 1344 : exprInputCollation(node) >= FirstNormalObjectId)
662 6 : errdetail_msg = _("User-defined collations are not allowed.");
663 : }
664 :
665 : /*
666 : * If we found a problem in this node, throw error now. Otherwise keep
667 : * going.
668 : */
669 1434 : if (errdetail_msg)
670 42 : ereport(ERROR,
671 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
672 : errmsg("invalid publication WHERE expression"),
673 : errdetail_internal("%s", errdetail_msg),
674 : parser_errposition(pstate, exprLocation(node))));
675 :
676 1392 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
677 : pstate);
678 : }
679 :
680 : /*
681 : * Check if the row filter expression is a "simple expression".
682 : *
683 : * See check_simple_rowfilter_expr_walker for details.
684 : */
685 : static bool
686 376 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
687 : {
688 376 : return check_simple_rowfilter_expr_walker(node, pstate);
689 : }
690 :
691 : /*
692 : * Transform the publication WHERE expression for all the relations in the list,
693 : * ensuring it is coerced to boolean and necessary collation information is
694 : * added if required, and add a new nsitem/RTE for the associated relation to
695 : * the ParseState's namespace list.
696 : *
697 : * Also check the publication row filter expression and throw an error if
698 : * anything not permitted or unexpected is encountered.
699 : */
700 : static void
701 1186 : TransformPubWhereClauses(List *tables, const char *queryString,
702 : bool pubviaroot)
703 : {
704 : ListCell *lc;
705 :
706 2388 : foreach(lc, tables)
707 : {
708 : ParseNamespaceItem *nsitem;
709 1262 : Node *whereclause = NULL;
710 : ParseState *pstate;
711 1262 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
712 :
713 1262 : if (pri->whereClause == NULL)
714 868 : continue;
715 :
716 : /*
717 : * If the publication doesn't publish changes via the root partitioned
718 : * table, the partition's row filter will be used. So disallow using
719 : * WHERE clause on partitioned table in this case.
720 : */
721 394 : if (!pubviaroot &&
722 364 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
723 6 : ereport(ERROR,
724 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
725 : errmsg("cannot use publication WHERE clause for relation \"%s\"",
726 : RelationGetRelationName(pri->relation)),
727 : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
728 : "publish_via_partition_root")));
729 :
730 : /*
731 : * A fresh pstate is required so that we only have "this" table in its
732 : * rangetable
733 : */
734 388 : pstate = make_parsestate(NULL);
735 388 : pstate->p_sourcetext = queryString;
736 388 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
737 : AccessShareLock, NULL,
738 : false, false);
739 388 : addNSItemToQuery(pstate, nsitem, false, true, true);
740 :
741 388 : whereclause = transformWhereClause(pstate,
742 388 : copyObject(pri->whereClause),
743 : EXPR_KIND_WHERE,
744 : "PUBLICATION WHERE");
745 :
746 : /* Fix up collation information */
747 376 : assign_expr_collations(pstate, whereclause);
748 :
749 376 : whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
750 :
751 : /*
752 : * We allow only simple expressions in row filters. See
753 : * check_simple_rowfilter_expr_walker.
754 : */
755 376 : check_simple_rowfilter_expr(whereclause, pstate);
756 :
757 334 : free_parsestate(pstate);
758 :
759 334 : pri->whereClause = whereclause;
760 : }
761 1126 : }
762 :
763 :
764 : /*
765 : * Given a list of tables that are going to be added to a publication,
766 : * verify that they fulfill the necessary preconditions, namely: no tables
767 : * have a column list if any schema is published; and partitioned tables do
768 : * not have column lists if publish_via_partition_root is not set.
769 : *
770 : * 'publish_schema' indicates that the publication contains any TABLES IN
771 : * SCHEMA elements (newly added in this command, or preexisting).
772 : * 'pubviaroot' is the value of publish_via_partition_root.
773 : */
774 : static void
775 1126 : CheckPubRelationColumnList(char *pubname, List *tables,
776 : bool publish_schema, bool pubviaroot)
777 : {
778 : ListCell *lc;
779 :
780 2298 : foreach(lc, tables)
781 : {
782 1202 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
783 :
784 1202 : if (pri->columns == NIL)
785 798 : continue;
786 :
787 : /*
788 : * Disallow specifying column list if any schema is in the
789 : * publication.
790 : *
791 : * XXX We could instead just forbid the case when the publication
792 : * tries to publish the table with a column list and a schema for that
793 : * table. However, if we do that then we need a restriction during
794 : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
795 : * seem to be a good idea.
796 : */
797 404 : if (publish_schema)
798 24 : ereport(ERROR,
799 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
800 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
801 : get_namespace_name(RelationGetNamespace(pri->relation)),
802 : RelationGetRelationName(pri->relation), pubname),
803 : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
804 :
805 : /*
806 : * If the publication doesn't publish changes via the root partitioned
807 : * table, the partition's column list will be used. So disallow using
808 : * a column list on the partitioned table in this case.
809 : */
810 380 : if (!pubviaroot &&
811 304 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
812 6 : ereport(ERROR,
813 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
814 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
815 : get_namespace_name(RelationGetNamespace(pri->relation)),
816 : RelationGetRelationName(pri->relation), pubname),
817 : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
818 : "publish_via_partition_root")));
819 : }
820 1096 : }
821 :
822 : /*
823 : * Create new publication.
824 : */
825 : ObjectAddress
826 896 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
827 : {
828 : Relation rel;
829 : ObjectAddress myself;
830 : Oid puboid;
831 : bool nulls[Natts_pg_publication];
832 : Datum values[Natts_pg_publication];
833 : HeapTuple tup;
834 : bool publish_given;
835 : PublicationActions pubactions;
836 : bool publish_via_partition_root_given;
837 : bool publish_via_partition_root;
838 : bool publish_generated_columns_given;
839 : char publish_generated_columns;
840 : AclResult aclresult;
841 896 : List *relations = NIL;
842 896 : List *schemaidlist = NIL;
843 :
844 : /* must have CREATE privilege on database */
845 896 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
846 896 : if (aclresult != ACLCHECK_OK)
847 6 : aclcheck_error(aclresult, OBJECT_DATABASE,
848 6 : get_database_name(MyDatabaseId));
849 :
850 : /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
851 890 : if (!superuser())
852 : {
853 24 : if (stmt->for_all_tables || stmt->for_all_sequences)
854 0 : ereport(ERROR,
855 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
856 : errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
857 : }
858 :
859 890 : rel = table_open(PublicationRelationId, RowExclusiveLock);
860 :
861 : /* Check if name is used */
862 890 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
863 : CStringGetDatum(stmt->pubname));
864 890 : if (OidIsValid(puboid))
865 6 : ereport(ERROR,
866 : (errcode(ERRCODE_DUPLICATE_OBJECT),
867 : errmsg("publication \"%s\" already exists",
868 : stmt->pubname)));
869 :
870 : /* Form a tuple. */
871 884 : memset(values, 0, sizeof(values));
872 884 : memset(nulls, false, sizeof(nulls));
873 :
874 884 : values[Anum_pg_publication_pubname - 1] =
875 884 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
876 884 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
877 :
878 884 : parse_publication_options(pstate,
879 : stmt->options,
880 : &publish_given, &pubactions,
881 : &publish_via_partition_root_given,
882 : &publish_via_partition_root,
883 : &publish_generated_columns_given,
884 : &publish_generated_columns);
885 :
886 848 : if (stmt->for_all_sequences &&
887 28 : (publish_given || publish_via_partition_root_given ||
888 : publish_generated_columns_given))
889 2 : ereport(NOTICE,
890 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
891 : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
892 :
893 848 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
894 : Anum_pg_publication_oid);
895 848 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
896 848 : values[Anum_pg_publication_puballtables - 1] =
897 848 : BoolGetDatum(stmt->for_all_tables);
898 848 : values[Anum_pg_publication_puballsequences - 1] =
899 848 : BoolGetDatum(stmt->for_all_sequences);
900 848 : values[Anum_pg_publication_pubinsert - 1] =
901 848 : BoolGetDatum(pubactions.pubinsert);
902 848 : values[Anum_pg_publication_pubupdate - 1] =
903 848 : BoolGetDatum(pubactions.pubupdate);
904 848 : values[Anum_pg_publication_pubdelete - 1] =
905 848 : BoolGetDatum(pubactions.pubdelete);
906 848 : values[Anum_pg_publication_pubtruncate - 1] =
907 848 : BoolGetDatum(pubactions.pubtruncate);
908 848 : values[Anum_pg_publication_pubviaroot - 1] =
909 848 : BoolGetDatum(publish_via_partition_root);
910 848 : values[Anum_pg_publication_pubgencols - 1] =
911 848 : CharGetDatum(publish_generated_columns);
912 :
913 848 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
914 :
915 : /* Insert tuple into catalog. */
916 848 : CatalogTupleInsert(rel, tup);
917 848 : heap_freetuple(tup);
918 :
919 848 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
920 :
921 848 : ObjectAddressSet(myself, PublicationRelationId, puboid);
922 :
923 : /* Make the changes visible. */
924 848 : CommandCounterIncrement();
925 :
926 : /* Associate objects with the publication. */
927 848 : if (stmt->for_all_tables)
928 : {
929 : /*
930 : * Invalidate relcache so that publication info is rebuilt. Sequences
931 : * publication doesn't require invalidation, as replica identity
932 : * checks don't apply to them.
933 : */
934 106 : CacheInvalidateRelcacheAll();
935 : }
936 742 : else if (!stmt->for_all_sequences)
937 : {
938 722 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
939 : &schemaidlist);
940 :
941 : /* FOR TABLES IN SCHEMA requires superuser */
942 704 : if (schemaidlist != NIL && !superuser())
943 6 : ereport(ERROR,
944 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
945 : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
946 :
947 698 : if (relations != NIL)
948 : {
949 : List *rels;
950 :
951 474 : rels = OpenTableList(relations);
952 444 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
953 : publish_via_partition_root);
954 :
955 420 : CheckPubRelationColumnList(stmt->pubname, rels,
956 : schemaidlist != NIL,
957 : publish_via_partition_root);
958 :
959 414 : PublicationAddTables(puboid, rels, true, NULL);
960 388 : CloseTableList(rels);
961 : }
962 :
963 612 : if (schemaidlist != NIL)
964 : {
965 : /*
966 : * Schema lock is held until the publication is created to prevent
967 : * concurrent schema deletion.
968 : */
969 152 : LockSchemaList(schemaidlist);
970 152 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
971 : }
972 : }
973 :
974 732 : table_close(rel, RowExclusiveLock);
975 :
976 732 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
977 :
978 : /*
979 : * We don't need this warning message when wal_level >= 'replica' since
980 : * logical decoding is automatically enabled up on a logical slot
981 : * creation.
982 : */
983 732 : if (wal_level < WAL_LEVEL_REPLICA)
984 26 : ereport(WARNING,
985 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
986 : errmsg("logical decoding must be enabled to publish logical changes"),
987 : errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
988 :
989 732 : return myself;
990 : }
991 :
992 : /*
993 : * Change options of a publication.
994 : */
995 : static void
996 128 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
997 : Relation rel, HeapTuple tup)
998 : {
999 : bool nulls[Natts_pg_publication];
1000 : bool replaces[Natts_pg_publication];
1001 : Datum values[Natts_pg_publication];
1002 : bool publish_given;
1003 : PublicationActions pubactions;
1004 : bool publish_via_partition_root_given;
1005 : bool publish_via_partition_root;
1006 : bool publish_generated_columns_given;
1007 : char publish_generated_columns;
1008 : ObjectAddress obj;
1009 : Form_pg_publication pubform;
1010 128 : List *root_relids = NIL;
1011 : ListCell *lc;
1012 :
1013 128 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1014 :
1015 128 : parse_publication_options(pstate,
1016 : stmt->options,
1017 : &publish_given, &pubactions,
1018 : &publish_via_partition_root_given,
1019 : &publish_via_partition_root,
1020 : &publish_generated_columns_given,
1021 : &publish_generated_columns);
1022 :
1023 128 : if (pubform->puballsequences &&
1024 12 : (publish_given || publish_via_partition_root_given ||
1025 : publish_generated_columns_given))
1026 12 : ereport(NOTICE,
1027 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1028 : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
1029 :
1030 : /*
1031 : * If the publication doesn't publish changes via the root partitioned
1032 : * table, the partition's row filter and column list will be used. So
1033 : * disallow using WHERE clause and column lists on partitioned table in
1034 : * this case.
1035 : */
1036 128 : if (!pubform->puballtables && publish_via_partition_root_given &&
1037 80 : !publish_via_partition_root)
1038 : {
1039 : /*
1040 : * Lock the publication so nobody else can do anything with it. This
1041 : * prevents concurrent alter to add partitioned table(s) with WHERE
1042 : * clause(s) and/or column lists which we don't allow when not
1043 : * publishing via root.
1044 : */
1045 48 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1046 : AccessShareLock);
1047 :
1048 48 : root_relids = GetPublicationRelations(pubform->oid,
1049 : PUBLICATION_PART_ROOT);
1050 :
1051 84 : foreach(lc, root_relids)
1052 : {
1053 48 : Oid relid = lfirst_oid(lc);
1054 : HeapTuple rftuple;
1055 : char relkind;
1056 : char *relname;
1057 : bool has_rowfilter;
1058 : bool has_collist;
1059 :
1060 : /*
1061 : * Beware: we don't have lock on the relations, so cope silently
1062 : * with the cache lookups returning NULL.
1063 : */
1064 :
1065 48 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1066 : ObjectIdGetDatum(relid),
1067 : ObjectIdGetDatum(pubform->oid));
1068 48 : if (!HeapTupleIsValid(rftuple))
1069 0 : continue;
1070 48 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1071 48 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1072 48 : if (!has_rowfilter && !has_collist)
1073 : {
1074 12 : ReleaseSysCache(rftuple);
1075 12 : continue;
1076 : }
1077 :
1078 36 : relkind = get_rel_relkind(relid);
1079 36 : if (relkind != RELKIND_PARTITIONED_TABLE)
1080 : {
1081 24 : ReleaseSysCache(rftuple);
1082 24 : continue;
1083 : }
1084 12 : relname = get_rel_name(relid);
1085 12 : if (relname == NULL) /* table concurrently dropped */
1086 : {
1087 0 : ReleaseSysCache(rftuple);
1088 0 : continue;
1089 : }
1090 :
1091 12 : if (has_rowfilter)
1092 6 : ereport(ERROR,
1093 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1094 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1095 : "publish_via_partition_root",
1096 : stmt->pubname),
1097 : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1098 : relname, "publish_via_partition_root")));
1099 : Assert(has_collist);
1100 6 : ereport(ERROR,
1101 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1102 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1103 : "publish_via_partition_root",
1104 : stmt->pubname),
1105 : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1106 : relname, "publish_via_partition_root")));
1107 : }
1108 : }
1109 :
1110 : /* Everything ok, form a new tuple. */
1111 116 : memset(values, 0, sizeof(values));
1112 116 : memset(nulls, false, sizeof(nulls));
1113 116 : memset(replaces, false, sizeof(replaces));
1114 :
1115 116 : if (publish_given)
1116 : {
1117 34 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1118 34 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1119 :
1120 34 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1121 34 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1122 :
1123 34 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1124 34 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1125 :
1126 34 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1127 34 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1128 : }
1129 :
1130 116 : if (publish_via_partition_root_given)
1131 : {
1132 70 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1133 70 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1134 : }
1135 :
1136 116 : if (publish_generated_columns_given)
1137 : {
1138 12 : values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1139 12 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1140 : }
1141 :
1142 116 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1143 : replaces);
1144 :
1145 : /* Update the catalog. */
1146 116 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1147 :
1148 116 : CommandCounterIncrement();
1149 :
1150 116 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1151 :
1152 : /* Invalidate the relcache. */
1153 116 : if (pubform->puballtables)
1154 : {
1155 20 : CacheInvalidateRelcacheAll();
1156 : }
1157 : else
1158 : {
1159 96 : List *relids = NIL;
1160 96 : List *schemarelids = NIL;
1161 :
1162 : /*
1163 : * For any partitioned tables contained in the publication, we must
1164 : * invalidate all partitions contained in the respective partition
1165 : * trees, not just those explicitly mentioned in the publication.
1166 : */
1167 96 : if (root_relids == NIL)
1168 60 : relids = GetPublicationRelations(pubform->oid,
1169 : PUBLICATION_PART_ALL);
1170 : else
1171 : {
1172 : /*
1173 : * We already got tables explicitly mentioned in the publication.
1174 : * Now get all partitions for the partitioned table in the list.
1175 : */
1176 72 : foreach(lc, root_relids)
1177 36 : relids = GetPubPartitionOptionRelations(relids,
1178 : PUBLICATION_PART_ALL,
1179 : lfirst_oid(lc));
1180 : }
1181 :
1182 96 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1183 : PUBLICATION_PART_ALL);
1184 96 : relids = list_concat_unique_oid(relids, schemarelids);
1185 :
1186 96 : InvalidatePublicationRels(relids);
1187 : }
1188 :
1189 116 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1190 116 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1191 : (Node *) stmt);
1192 :
1193 116 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1194 116 : }
1195 :
1196 : /*
1197 : * Invalidate the relations.
1198 : */
1199 : void
1200 2450 : InvalidatePublicationRels(List *relids)
1201 : {
1202 : /*
1203 : * We don't want to send too many individual messages, at some point it's
1204 : * cheaper to just reset whole relcache.
1205 : */
1206 2450 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1207 : {
1208 : ListCell *lc;
1209 :
1210 20654 : foreach(lc, relids)
1211 18204 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1212 : }
1213 : else
1214 0 : CacheInvalidateRelcacheAll();
1215 2450 : }
1216 :
1217 : /*
1218 : * Add or remove table to/from publication.
1219 : */
1220 : static void
1221 918 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1222 : List *tables, const char *queryString,
1223 : bool publish_schema)
1224 : {
1225 918 : List *rels = NIL;
1226 918 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1227 918 : Oid pubid = pubform->oid;
1228 :
1229 : /*
1230 : * Nothing to do if no objects, except in SET: for that it is quite
1231 : * possible that user has not specified any tables in which case we need
1232 : * to remove all the existing tables.
1233 : */
1234 918 : if (!tables && stmt->action != AP_SetObjects)
1235 76 : return;
1236 :
1237 842 : rels = OpenTableList(tables);
1238 :
1239 842 : if (stmt->action == AP_AddObjects)
1240 : {
1241 296 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1242 :
1243 278 : publish_schema |= is_schema_publication(pubid);
1244 :
1245 278 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1246 278 : pubform->pubviaroot);
1247 :
1248 266 : PublicationAddTables(pubid, rels, false, stmt);
1249 : }
1250 546 : else if (stmt->action == AP_DropObjects)
1251 100 : PublicationDropTables(pubid, rels, false);
1252 : else /* AP_SetObjects */
1253 : {
1254 446 : List *oldrelids = GetPublicationRelations(pubid,
1255 : PUBLICATION_PART_ROOT);
1256 446 : List *delrels = NIL;
1257 : ListCell *oldlc;
1258 :
1259 446 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1260 :
1261 428 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1262 428 : pubform->pubviaroot);
1263 :
1264 : /*
1265 : * To recreate the relation list for the publication, look for
1266 : * existing relations that do not need to be dropped.
1267 : */
1268 806 : foreach(oldlc, oldrelids)
1269 : {
1270 402 : Oid oldrelid = lfirst_oid(oldlc);
1271 : ListCell *newlc;
1272 : PublicationRelInfo *oldrel;
1273 402 : bool found = false;
1274 : HeapTuple rftuple;
1275 402 : Node *oldrelwhereclause = NULL;
1276 402 : Bitmapset *oldcolumns = NULL;
1277 :
1278 : /* look up the cache for the old relmap */
1279 402 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1280 : ObjectIdGetDatum(oldrelid),
1281 : ObjectIdGetDatum(pubid));
1282 :
1283 : /*
1284 : * See if the existing relation currently has a WHERE clause or a
1285 : * column list. We need to compare those too.
1286 : */
1287 402 : if (HeapTupleIsValid(rftuple))
1288 : {
1289 402 : bool isnull = true;
1290 : Datum whereClauseDatum;
1291 : Datum columnListDatum;
1292 :
1293 : /* Load the WHERE clause for this table. */
1294 402 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1295 : Anum_pg_publication_rel_prqual,
1296 : &isnull);
1297 402 : if (!isnull)
1298 210 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1299 :
1300 : /* Transform the int2vector column list to a bitmap. */
1301 402 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1302 : Anum_pg_publication_rel_prattrs,
1303 : &isnull);
1304 :
1305 402 : if (!isnull)
1306 136 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1307 :
1308 402 : ReleaseSysCache(rftuple);
1309 : }
1310 :
1311 778 : foreach(newlc, rels)
1312 : {
1313 : PublicationRelInfo *newpubrel;
1314 : Oid newrelid;
1315 404 : Bitmapset *newcolumns = NULL;
1316 :
1317 404 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1318 404 : newrelid = RelationGetRelid(newpubrel->relation);
1319 :
1320 : /*
1321 : * Validate the column list. If the column list or WHERE
1322 : * clause changes, then the validation done here will be
1323 : * duplicated inside PublicationAddTables(). The validation
1324 : * is cheap enough that that seems harmless.
1325 : */
1326 404 : newcolumns = pub_collist_validate(newpubrel->relation,
1327 : newpubrel->columns);
1328 :
1329 : /*
1330 : * Check if any of the new set of relations matches with the
1331 : * existing relations in the publication. Additionally, if the
1332 : * relation has an associated WHERE clause, check the WHERE
1333 : * expressions also match. Same for the column list. Drop the
1334 : * rest.
1335 : */
1336 392 : if (newrelid == oldrelid)
1337 : {
1338 284 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1339 80 : bms_equal(oldcolumns, newcolumns))
1340 : {
1341 16 : found = true;
1342 16 : break;
1343 : }
1344 : }
1345 : }
1346 :
1347 : /*
1348 : * Add the non-matched relations to a list so that they can be
1349 : * dropped.
1350 : */
1351 390 : if (!found)
1352 : {
1353 374 : oldrel = palloc_object(PublicationRelInfo);
1354 374 : oldrel->whereClause = NULL;
1355 374 : oldrel->columns = NIL;
1356 374 : oldrel->relation = table_open(oldrelid,
1357 : ShareUpdateExclusiveLock);
1358 374 : delrels = lappend(delrels, oldrel);
1359 : }
1360 : }
1361 :
1362 : /* And drop them. */
1363 404 : PublicationDropTables(pubid, delrels, true);
1364 :
1365 : /*
1366 : * Don't bother calculating the difference for adding, we'll catch and
1367 : * skip existing ones when doing catalog update.
1368 : */
1369 404 : PublicationAddTables(pubid, rels, true, stmt);
1370 :
1371 404 : CloseTableList(delrels);
1372 : }
1373 :
1374 710 : CloseTableList(rels);
1375 : }
1376 :
1377 : /*
1378 : * Alter the publication schemas.
1379 : *
1380 : * Add or remove schemas to/from publication.
1381 : */
1382 : static void
1383 786 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1384 : HeapTuple tup, List *schemaidlist)
1385 : {
1386 786 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1387 :
1388 : /*
1389 : * Nothing to do if no objects, except in SET: for that it is quite
1390 : * possible that user has not specified any schemas in which case we need
1391 : * to remove all the existing schemas.
1392 : */
1393 786 : if (!schemaidlist && stmt->action != AP_SetObjects)
1394 306 : return;
1395 :
1396 : /*
1397 : * Schema lock is held until the publication is altered to prevent
1398 : * concurrent schema deletion.
1399 : */
1400 480 : LockSchemaList(schemaidlist);
1401 480 : if (stmt->action == AP_AddObjects)
1402 : {
1403 : ListCell *lc;
1404 : List *reloids;
1405 :
1406 38 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1407 :
1408 54 : foreach(lc, reloids)
1409 : {
1410 : HeapTuple coltuple;
1411 :
1412 22 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1413 : ObjectIdGetDatum(lfirst_oid(lc)),
1414 : ObjectIdGetDatum(pubform->oid));
1415 :
1416 22 : if (!HeapTupleIsValid(coltuple))
1417 0 : continue;
1418 :
1419 : /*
1420 : * Disallow adding schema if column list is already part of the
1421 : * publication. See CheckPubRelationColumnList.
1422 : */
1423 22 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1424 6 : ereport(ERROR,
1425 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1426 : errmsg("cannot add schema to publication \"%s\"",
1427 : stmt->pubname),
1428 : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1429 :
1430 16 : ReleaseSysCache(coltuple);
1431 : }
1432 :
1433 32 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1434 : }
1435 442 : else if (stmt->action == AP_DropObjects)
1436 38 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1437 : else /* AP_SetObjects */
1438 : {
1439 404 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1440 404 : List *delschemas = NIL;
1441 :
1442 : /* Identify which schemas should be dropped */
1443 404 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1444 :
1445 : /*
1446 : * Schema lock is held until the publication is altered to prevent
1447 : * concurrent schema deletion.
1448 : */
1449 404 : LockSchemaList(delschemas);
1450 :
1451 : /* And drop them */
1452 404 : PublicationDropSchemas(pubform->oid, delschemas, true);
1453 :
1454 : /*
1455 : * Don't bother calculating the difference for adding, we'll catch and
1456 : * skip existing ones when doing catalog update.
1457 : */
1458 404 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1459 : }
1460 : }
1461 :
1462 : /*
1463 : * Check if relations and schemas can be in a given publication and throw
1464 : * appropriate error if not.
1465 : */
1466 : static void
1467 960 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1468 : List *tables, List *schemaidlist)
1469 : {
1470 960 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1471 :
1472 960 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1473 104 : schemaidlist && !superuser())
1474 6 : ereport(ERROR,
1475 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1476 : errmsg("must be superuser to add or set schemas")));
1477 :
1478 : /*
1479 : * Check that user is allowed to manipulate the publication tables in
1480 : * schema
1481 : */
1482 954 : if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
1483 : {
1484 18 : if (pubform->puballtables && pubform->puballsequences)
1485 0 : ereport(ERROR,
1486 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1487 : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1488 : NameStr(pubform->pubname)),
1489 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1490 18 : else if (pubform->puballtables)
1491 18 : ereport(ERROR,
1492 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1493 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1494 : NameStr(pubform->pubname)),
1495 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
1496 : else
1497 0 : ereport(ERROR,
1498 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1499 : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1500 : NameStr(pubform->pubname)),
1501 : errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1502 : }
1503 :
1504 : /* Check that user is allowed to manipulate the publication tables. */
1505 936 : if (tables && (pubform->puballtables || pubform->puballsequences))
1506 : {
1507 18 : if (pubform->puballtables && pubform->puballsequences)
1508 0 : ereport(ERROR,
1509 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1510 : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1511 : NameStr(pubform->pubname)),
1512 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1513 18 : else if (pubform->puballtables)
1514 18 : ereport(ERROR,
1515 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1516 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1517 : NameStr(pubform->pubname)),
1518 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
1519 : else
1520 0 : ereport(ERROR,
1521 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1522 : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1523 : NameStr(pubform->pubname)),
1524 : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1525 : }
1526 918 : }
1527 :
1528 : /*
1529 : * Alter the existing publication.
1530 : *
1531 : * This is dispatcher function for AlterPublicationOptions,
1532 : * AlterPublicationSchemas and AlterPublicationTables.
1533 : */
1534 : void
1535 1106 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1536 : {
1537 : Relation rel;
1538 : HeapTuple tup;
1539 : Form_pg_publication pubform;
1540 :
1541 1106 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1542 :
1543 1106 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1544 : CStringGetDatum(stmt->pubname));
1545 :
1546 1106 : if (!HeapTupleIsValid(tup))
1547 0 : ereport(ERROR,
1548 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1549 : errmsg("publication \"%s\" does not exist",
1550 : stmt->pubname)));
1551 :
1552 1106 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1553 :
1554 : /* must be owner */
1555 1106 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1556 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1557 0 : stmt->pubname);
1558 :
1559 1106 : if (stmt->options)
1560 128 : AlterPublicationOptions(pstate, stmt, rel, tup);
1561 : else
1562 : {
1563 978 : List *relations = NIL;
1564 978 : List *schemaidlist = NIL;
1565 978 : Oid pubid = pubform->oid;
1566 :
1567 978 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1568 : &schemaidlist);
1569 :
1570 960 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1571 :
1572 918 : heap_freetuple(tup);
1573 :
1574 : /* Lock the publication so nobody else can do anything with it. */
1575 918 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1576 : AccessExclusiveLock);
1577 :
1578 : /*
1579 : * It is possible that by the time we acquire the lock on publication,
1580 : * concurrent DDL has removed it. We can test this by checking the
1581 : * existence of publication. We get the tuple again to avoid the risk
1582 : * of any publication option getting changed.
1583 : */
1584 918 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1585 918 : if (!HeapTupleIsValid(tup))
1586 0 : ereport(ERROR,
1587 : errcode(ERRCODE_UNDEFINED_OBJECT),
1588 : errmsg("publication \"%s\" does not exist",
1589 : stmt->pubname));
1590 :
1591 918 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1592 : schemaidlist != NIL);
1593 786 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1594 : }
1595 :
1596 : /* Cleanup. */
1597 884 : heap_freetuple(tup);
1598 884 : table_close(rel, RowExclusiveLock);
1599 884 : }
1600 :
1601 : /*
1602 : * Remove relation from publication by mapping OID.
1603 : */
1604 : void
1605 848 : RemovePublicationRelById(Oid proid)
1606 : {
1607 : Relation rel;
1608 : HeapTuple tup;
1609 : Form_pg_publication_rel pubrel;
1610 848 : List *relids = NIL;
1611 :
1612 848 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1613 :
1614 848 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1615 :
1616 848 : if (!HeapTupleIsValid(tup))
1617 0 : elog(ERROR, "cache lookup failed for publication table %u",
1618 : proid);
1619 :
1620 848 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1621 :
1622 : /*
1623 : * Invalidate relcache so that publication info is rebuilt.
1624 : *
1625 : * For the partitioned tables, we must invalidate all partitions contained
1626 : * in the respective partition hierarchies, not just the one explicitly
1627 : * mentioned in the publication. This is required because we implicitly
1628 : * publish the child tables when the parent table is published.
1629 : */
1630 848 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1631 : pubrel->prrelid);
1632 :
1633 848 : InvalidatePublicationRels(relids);
1634 :
1635 848 : CatalogTupleDelete(rel, &tup->t_self);
1636 :
1637 848 : ReleaseSysCache(tup);
1638 :
1639 848 : table_close(rel, RowExclusiveLock);
1640 848 : }
1641 :
1642 : /*
1643 : * Remove the publication by mapping OID.
1644 : */
1645 : void
1646 500 : RemovePublicationById(Oid pubid)
1647 : {
1648 : Relation rel;
1649 : HeapTuple tup;
1650 : Form_pg_publication pubform;
1651 :
1652 500 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1653 :
1654 500 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1655 500 : if (!HeapTupleIsValid(tup))
1656 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1657 :
1658 500 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1659 :
1660 : /* Invalidate relcache so that publication info is rebuilt. */
1661 500 : if (pubform->puballtables)
1662 70 : CacheInvalidateRelcacheAll();
1663 :
1664 500 : CatalogTupleDelete(rel, &tup->t_self);
1665 :
1666 500 : ReleaseSysCache(tup);
1667 :
1668 500 : table_close(rel, RowExclusiveLock);
1669 500 : }
1670 :
1671 : /*
1672 : * Remove schema from publication by mapping OID.
1673 : */
1674 : void
1675 192 : RemovePublicationSchemaById(Oid psoid)
1676 : {
1677 : Relation rel;
1678 : HeapTuple tup;
1679 192 : List *schemaRels = NIL;
1680 : Form_pg_publication_namespace pubsch;
1681 :
1682 192 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1683 :
1684 192 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1685 :
1686 192 : if (!HeapTupleIsValid(tup))
1687 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1688 :
1689 192 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1690 :
1691 : /*
1692 : * Invalidate relcache so that publication info is rebuilt. See
1693 : * RemovePublicationRelById for why we need to consider all the
1694 : * partitions.
1695 : */
1696 192 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1697 : PUBLICATION_PART_ALL);
1698 192 : InvalidatePublicationRels(schemaRels);
1699 :
1700 192 : CatalogTupleDelete(rel, &tup->t_self);
1701 :
1702 192 : ReleaseSysCache(tup);
1703 :
1704 192 : table_close(rel, RowExclusiveLock);
1705 192 : }
1706 :
1707 : /*
1708 : * Open relations specified by a PublicationTable list.
1709 : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1710 : * add them to a publication.
1711 : */
1712 : static List *
1713 1316 : OpenTableList(List *tables)
1714 : {
1715 1316 : List *relids = NIL;
1716 1316 : List *rels = NIL;
1717 : ListCell *lc;
1718 1316 : List *relids_with_rf = NIL;
1719 1316 : List *relids_with_collist = NIL;
1720 :
1721 : /*
1722 : * Open, share-lock, and check all the explicitly-specified relations
1723 : */
1724 2700 : foreach(lc, tables)
1725 : {
1726 1414 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1727 1414 : bool recurse = t->relation->inh;
1728 : Relation rel;
1729 : Oid myrelid;
1730 : PublicationRelInfo *pub_rel;
1731 :
1732 : /* Allow query cancel in case this takes a long time */
1733 1414 : CHECK_FOR_INTERRUPTS();
1734 :
1735 1414 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1736 1408 : myrelid = RelationGetRelid(rel);
1737 :
1738 : /*
1739 : * Filter out duplicates if user specifies "foo, foo".
1740 : *
1741 : * Note that this algorithm is known to not be very efficient (O(N^2))
1742 : * but given that it only works on list of tables given to us by user
1743 : * it's deemed acceptable.
1744 : */
1745 1408 : if (list_member_oid(relids, myrelid))
1746 : {
1747 : /* Disallow duplicate tables if there are any with row filters. */
1748 24 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1749 12 : ereport(ERROR,
1750 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1751 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1752 : RelationGetRelationName(rel))));
1753 :
1754 : /* Disallow duplicate tables if there are any with column lists. */
1755 12 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1756 12 : ereport(ERROR,
1757 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1758 : errmsg("conflicting or redundant column lists for table \"%s\"",
1759 : RelationGetRelationName(rel))));
1760 :
1761 0 : table_close(rel, ShareUpdateExclusiveLock);
1762 0 : continue;
1763 : }
1764 :
1765 1384 : pub_rel = palloc_object(PublicationRelInfo);
1766 1384 : pub_rel->relation = rel;
1767 1384 : pub_rel->whereClause = t->whereClause;
1768 1384 : pub_rel->columns = t->columns;
1769 1384 : rels = lappend(rels, pub_rel);
1770 1384 : relids = lappend_oid(relids, myrelid);
1771 :
1772 1384 : if (t->whereClause)
1773 404 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1774 :
1775 1384 : if (t->columns)
1776 410 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1777 :
1778 : /*
1779 : * Add children of this rel, if requested, so that they too are added
1780 : * to the publication. A partitioned table can't have any inheritance
1781 : * children other than its partitions, which need not be explicitly
1782 : * added to the publication.
1783 : */
1784 1384 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1785 : {
1786 : List *children;
1787 : ListCell *child;
1788 :
1789 1160 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1790 : NULL);
1791 :
1792 2328 : foreach(child, children)
1793 : {
1794 1168 : Oid childrelid = lfirst_oid(child);
1795 :
1796 : /* Allow query cancel in case this takes a long time */
1797 1168 : CHECK_FOR_INTERRUPTS();
1798 :
1799 : /*
1800 : * Skip duplicates if user specified both parent and child
1801 : * tables.
1802 : */
1803 1168 : if (list_member_oid(relids, childrelid))
1804 : {
1805 : /*
1806 : * We don't allow to specify row filter for both parent
1807 : * and child table at the same time as it is not very
1808 : * clear which one should be given preference.
1809 : */
1810 1160 : if (childrelid != myrelid &&
1811 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1812 0 : ereport(ERROR,
1813 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1814 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1815 : RelationGetRelationName(rel))));
1816 :
1817 : /*
1818 : * We don't allow to specify column list for both parent
1819 : * and child table at the same time as it is not very
1820 : * clear which one should be given preference.
1821 : */
1822 1160 : if (childrelid != myrelid &&
1823 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1824 0 : ereport(ERROR,
1825 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1826 : errmsg("conflicting or redundant column lists for table \"%s\"",
1827 : RelationGetRelationName(rel))));
1828 :
1829 1160 : continue;
1830 : }
1831 :
1832 : /* find_all_inheritors already got lock */
1833 8 : rel = table_open(childrelid, NoLock);
1834 8 : pub_rel = palloc_object(PublicationRelInfo);
1835 8 : pub_rel->relation = rel;
1836 : /* child inherits WHERE clause from parent */
1837 8 : pub_rel->whereClause = t->whereClause;
1838 :
1839 : /* child inherits column list from parent */
1840 8 : pub_rel->columns = t->columns;
1841 8 : rels = lappend(rels, pub_rel);
1842 8 : relids = lappend_oid(relids, childrelid);
1843 :
1844 8 : if (t->whereClause)
1845 2 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1846 :
1847 8 : if (t->columns)
1848 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1849 : }
1850 : }
1851 : }
1852 :
1853 1286 : list_free(relids);
1854 1286 : list_free(relids_with_rf);
1855 :
1856 1286 : return rels;
1857 : }
1858 :
1859 : /*
1860 : * Close all relations in the list.
1861 : */
1862 : static void
1863 1502 : CloseTableList(List *rels)
1864 : {
1865 : ListCell *lc;
1866 :
1867 3056 : foreach(lc, rels)
1868 : {
1869 : PublicationRelInfo *pub_rel;
1870 :
1871 1554 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1872 1554 : table_close(pub_rel->relation, NoLock);
1873 : }
1874 :
1875 1502 : list_free_deep(rels);
1876 1502 : }
1877 :
1878 : /*
1879 : * Lock the schemas specified in the schema list in AccessShareLock mode in
1880 : * order to prevent concurrent schema deletion.
1881 : */
1882 : static void
1883 1036 : LockSchemaList(List *schemalist)
1884 : {
1885 : ListCell *lc;
1886 :
1887 1360 : foreach(lc, schemalist)
1888 : {
1889 324 : Oid schemaid = lfirst_oid(lc);
1890 :
1891 : /* Allow query cancel in case this takes a long time */
1892 324 : CHECK_FOR_INTERRUPTS();
1893 324 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1894 :
1895 : /*
1896 : * It is possible that by the time we acquire the lock on schema,
1897 : * concurrent DDL has removed it. We can test this by checking the
1898 : * existence of schema.
1899 : */
1900 324 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1901 0 : ereport(ERROR,
1902 : errcode(ERRCODE_UNDEFINED_SCHEMA),
1903 : errmsg("schema with OID %u does not exist", schemaid));
1904 : }
1905 1036 : }
1906 :
1907 : /*
1908 : * Add listed tables to the publication.
1909 : */
1910 : static void
1911 1084 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1912 : AlterPublicationStmt *stmt)
1913 : {
1914 : ListCell *lc;
1915 :
1916 2176 : foreach(lc, rels)
1917 : {
1918 1160 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1919 1160 : Relation rel = pub_rel->relation;
1920 : ObjectAddress obj;
1921 :
1922 : /* Must be owner of the table or superuser. */
1923 1160 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1924 6 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1925 6 : RelationGetRelationName(rel));
1926 :
1927 1154 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1928 1092 : if (stmt)
1929 : {
1930 628 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1931 : (Node *) stmt);
1932 :
1933 628 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1934 : obj.objectId, 0);
1935 : }
1936 : }
1937 1016 : }
1938 :
1939 : /*
1940 : * Remove listed tables from the publication.
1941 : */
1942 : static void
1943 504 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1944 : {
1945 : ObjectAddress obj;
1946 : ListCell *lc;
1947 : Oid prid;
1948 :
1949 966 : foreach(lc, rels)
1950 : {
1951 480 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1952 480 : Relation rel = pubrel->relation;
1953 480 : Oid relid = RelationGetRelid(rel);
1954 :
1955 480 : if (pubrel->columns)
1956 0 : ereport(ERROR,
1957 : errcode(ERRCODE_SYNTAX_ERROR),
1958 : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1959 :
1960 480 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1961 : ObjectIdGetDatum(relid),
1962 : ObjectIdGetDatum(pubid));
1963 480 : if (!OidIsValid(prid))
1964 : {
1965 12 : if (missing_ok)
1966 0 : continue;
1967 :
1968 12 : ereport(ERROR,
1969 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1970 : errmsg("relation \"%s\" is not part of the publication",
1971 : RelationGetRelationName(rel))));
1972 : }
1973 :
1974 468 : if (pubrel->whereClause)
1975 6 : ereport(ERROR,
1976 : (errcode(ERRCODE_SYNTAX_ERROR),
1977 : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1978 :
1979 462 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1980 462 : performDeletion(&obj, DROP_CASCADE, 0);
1981 : }
1982 486 : }
1983 :
1984 : /*
1985 : * Add listed schemas to the publication.
1986 : */
1987 : static void
1988 588 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1989 : AlterPublicationStmt *stmt)
1990 : {
1991 : ListCell *lc;
1992 :
1993 838 : foreach(lc, schemas)
1994 : {
1995 262 : Oid schemaid = lfirst_oid(lc);
1996 : ObjectAddress obj;
1997 :
1998 262 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
1999 250 : if (stmt)
2000 : {
2001 68 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
2002 : (Node *) stmt);
2003 :
2004 68 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
2005 : obj.objectId, 0);
2006 : }
2007 : }
2008 576 : }
2009 :
2010 : /*
2011 : * Remove listed schemas from the publication.
2012 : */
2013 : static void
2014 442 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
2015 : {
2016 : ObjectAddress obj;
2017 : ListCell *lc;
2018 : Oid psid;
2019 :
2020 492 : foreach(lc, schemas)
2021 : {
2022 56 : Oid schemaid = lfirst_oid(lc);
2023 :
2024 56 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
2025 : Anum_pg_publication_namespace_oid,
2026 : ObjectIdGetDatum(schemaid),
2027 : ObjectIdGetDatum(pubid));
2028 56 : if (!OidIsValid(psid))
2029 : {
2030 6 : if (missing_ok)
2031 0 : continue;
2032 :
2033 6 : ereport(ERROR,
2034 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2035 : errmsg("tables from schema \"%s\" are not part of the publication",
2036 : get_namespace_name(schemaid))));
2037 : }
2038 :
2039 50 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
2040 50 : performDeletion(&obj, DROP_CASCADE, 0);
2041 : }
2042 436 : }
2043 :
2044 : /*
2045 : * Internal workhorse for changing a publication owner
2046 : */
2047 : static void
2048 36 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2049 : {
2050 : Form_pg_publication form;
2051 :
2052 36 : form = (Form_pg_publication) GETSTRUCT(tup);
2053 :
2054 36 : if (form->pubowner == newOwnerId)
2055 12 : return;
2056 :
2057 24 : if (!superuser())
2058 : {
2059 : AclResult aclresult;
2060 :
2061 : /* Must be owner */
2062 12 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2063 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2064 0 : NameStr(form->pubname));
2065 :
2066 : /* Must be able to become new owner */
2067 12 : check_can_set_role(GetUserId(), newOwnerId);
2068 :
2069 : /* New owner must have CREATE privilege on database */
2070 12 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2071 12 : if (aclresult != ACLCHECK_OK)
2072 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2073 0 : get_database_name(MyDatabaseId));
2074 :
2075 12 : if (!superuser_arg(newOwnerId))
2076 : {
2077 12 : if (form->puballtables || form->puballsequences ||
2078 6 : is_schema_publication(form->oid))
2079 6 : ereport(ERROR,
2080 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2081 : errmsg("permission denied to change owner of publication \"%s\"",
2082 : NameStr(form->pubname)),
2083 : errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
2084 : }
2085 : }
2086 :
2087 18 : form->pubowner = newOwnerId;
2088 18 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2089 :
2090 : /* Update owner dependency reference */
2091 18 : changeDependencyOnOwner(PublicationRelationId,
2092 : form->oid,
2093 : newOwnerId);
2094 :
2095 18 : InvokeObjectPostAlterHook(PublicationRelationId,
2096 : form->oid, 0);
2097 : }
2098 :
2099 : /*
2100 : * Change publication owner -- by name
2101 : */
2102 : ObjectAddress
2103 36 : AlterPublicationOwner(const char *name, Oid newOwnerId)
2104 : {
2105 : Oid pubid;
2106 : HeapTuple tup;
2107 : Relation rel;
2108 : ObjectAddress address;
2109 : Form_pg_publication pubform;
2110 :
2111 36 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2112 :
2113 36 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2114 :
2115 36 : if (!HeapTupleIsValid(tup))
2116 0 : ereport(ERROR,
2117 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2118 : errmsg("publication \"%s\" does not exist", name)));
2119 :
2120 36 : pubform = (Form_pg_publication) GETSTRUCT(tup);
2121 36 : pubid = pubform->oid;
2122 :
2123 36 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2124 :
2125 30 : ObjectAddressSet(address, PublicationRelationId, pubid);
2126 :
2127 30 : heap_freetuple(tup);
2128 :
2129 30 : table_close(rel, RowExclusiveLock);
2130 :
2131 30 : return address;
2132 : }
2133 :
2134 : /*
2135 : * Change publication owner -- by OID
2136 : */
2137 : void
2138 0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
2139 : {
2140 : HeapTuple tup;
2141 : Relation rel;
2142 :
2143 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2144 :
2145 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2146 :
2147 0 : if (!HeapTupleIsValid(tup))
2148 0 : ereport(ERROR,
2149 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2150 : errmsg("publication with OID %u does not exist", pubid)));
2151 :
2152 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2153 :
2154 0 : heap_freetuple(tup);
2155 :
2156 0 : table_close(rel, RowExclusiveLock);
2157 0 : }
2158 :
2159 : /*
2160 : * Extract the publish_generated_columns option value from a DefElem. "stored"
2161 : * and "none" values are accepted.
2162 : */
2163 : static char
2164 76 : defGetGeneratedColsOption(DefElem *def)
2165 : {
2166 76 : char *sval = "";
2167 :
2168 : /*
2169 : * A parameter value is required.
2170 : */
2171 76 : if (def->arg)
2172 : {
2173 70 : sval = defGetString(def);
2174 :
2175 70 : if (pg_strcasecmp(sval, "none") == 0)
2176 22 : return PUBLISH_GENCOLS_NONE;
2177 48 : if (pg_strcasecmp(sval, "stored") == 0)
2178 42 : return PUBLISH_GENCOLS_STORED;
2179 : }
2180 :
2181 12 : ereport(ERROR,
2182 : errcode(ERRCODE_SYNTAX_ERROR),
2183 : errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
2184 : errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
2185 :
2186 : return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2187 : }
|