Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * subscriptioncmds.c
4 : * subscription catalog manipulation functions
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/subscriptioncmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/htup_details.h"
19 : #include "access/table.h"
20 : #include "access/twophase.h"
21 : #include "access/xact.h"
22 : #include "catalog/catalog.h"
23 : #include "catalog/dependency.h"
24 : #include "catalog/indexing.h"
25 : #include "catalog/namespace.h"
26 : #include "catalog/objectaccess.h"
27 : #include "catalog/objectaddress.h"
28 : #include "catalog/pg_authid_d.h"
29 : #include "catalog/pg_database_d.h"
30 : #include "catalog/pg_subscription.h"
31 : #include "catalog/pg_subscription_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/defrem.h"
34 : #include "commands/event_trigger.h"
35 : #include "commands/subscriptioncmds.h"
36 : #include "executor/executor.h"
37 : #include "miscadmin.h"
38 : #include "nodes/makefuncs.h"
39 : #include "pgstat.h"
40 : #include "replication/logicallauncher.h"
41 : #include "replication/logicalworker.h"
42 : #include "replication/origin.h"
43 : #include "replication/slot.h"
44 : #include "replication/walreceiver.h"
45 : #include "replication/walsender.h"
46 : #include "replication/worker_internal.h"
47 : #include "storage/lmgr.h"
48 : #include "utils/acl.h"
49 : #include "utils/builtins.h"
50 : #include "utils/guc.h"
51 : #include "utils/lsyscache.h"
52 : #include "utils/memutils.h"
53 : #include "utils/pg_lsn.h"
54 : #include "utils/syscache.h"
55 :
56 : /*
57 : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
58 : * command.
59 : */
60 : #define SUBOPT_CONNECT 0x00000001
61 : #define SUBOPT_ENABLED 0x00000002
62 : #define SUBOPT_CREATE_SLOT 0x00000004
63 : #define SUBOPT_SLOT_NAME 0x00000008
64 : #define SUBOPT_COPY_DATA 0x00000010
65 : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
66 : #define SUBOPT_REFRESH 0x00000040
67 : #define SUBOPT_BINARY 0x00000080
68 : #define SUBOPT_STREAMING 0x00000100
69 : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
70 : #define SUBOPT_DISABLE_ON_ERR 0x00000400
71 : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
72 : #define SUBOPT_RUN_AS_OWNER 0x00001000
73 : #define SUBOPT_FAILOVER 0x00002000
74 : #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
75 : #define SUBOPT_MAX_RETENTION_DURATION 0x00008000
76 : #define SUBOPT_LSN 0x00010000
77 : #define SUBOPT_ORIGIN 0x00020000
78 :
79 : /* check if the 'val' has 'bits' set */
80 : #define IsSet(val, bits) (((val) & (bits)) == (bits))
81 :
82 : /*
83 : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
84 : * SUBSCRIPTION command options and the parsed/default values of each of them.
85 : */
86 : typedef struct SubOpts
87 : {
88 : bits32 specified_opts;
89 : char *slot_name;
90 : char *synchronous_commit;
91 : bool connect;
92 : bool enabled;
93 : bool create_slot;
94 : bool copy_data;
95 : bool refresh;
96 : bool binary;
97 : char streaming;
98 : bool twophase;
99 : bool disableonerr;
100 : bool passwordrequired;
101 : bool runasowner;
102 : bool failover;
103 : bool retaindeadtuples;
104 : int32 maxretention;
105 : char *origin;
106 : XLogRecPtr lsn;
107 : } SubOpts;
108 :
109 : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
110 : static void check_publications_origin(WalReceiverConn *wrconn,
111 : List *publications, bool copydata,
112 : bool retain_dead_tuples, char *origin,
113 : Oid *subrel_local_oids, int subrel_count,
114 : char *subname);
115 : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
116 : static void check_duplicates_in_publist(List *publist, Datum *datums);
117 : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
118 : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
119 : static void CheckAlterSubOption(Subscription *sub, const char *option,
120 : bool slot_needs_update, bool isTopLevel);
121 :
122 :
123 : /*
124 : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
125 : *
126 : * Since not all options can be specified in both commands, this function
127 : * will report an error if mutually exclusive options are specified.
128 : */
129 : static void
130 968 : parse_subscription_options(ParseState *pstate, List *stmt_options,
131 : bits32 supported_opts, SubOpts *opts)
132 : {
133 : ListCell *lc;
134 :
135 : /* Start out with cleared opts. */
136 968 : memset(opts, 0, sizeof(SubOpts));
137 :
138 : /* caller must expect some option */
139 : Assert(supported_opts != 0);
140 :
141 : /* If connect option is supported, these others also need to be. */
142 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
143 : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
144 : SUBOPT_COPY_DATA));
145 :
146 : /* Set default values for the supported options. */
147 968 : if (IsSet(supported_opts, SUBOPT_CONNECT))
148 480 : opts->connect = true;
149 968 : if (IsSet(supported_opts, SUBOPT_ENABLED))
150 584 : opts->enabled = true;
151 968 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
152 480 : opts->create_slot = true;
153 968 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
154 622 : opts->copy_data = true;
155 968 : if (IsSet(supported_opts, SUBOPT_REFRESH))
156 86 : opts->refresh = true;
157 968 : if (IsSet(supported_opts, SUBOPT_BINARY))
158 698 : opts->binary = false;
159 968 : if (IsSet(supported_opts, SUBOPT_STREAMING))
160 698 : opts->streaming = LOGICALREP_STREAM_PARALLEL;
161 968 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
162 698 : opts->twophase = false;
163 968 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
164 698 : opts->disableonerr = false;
165 968 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
166 698 : opts->passwordrequired = true;
167 968 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
168 698 : opts->runasowner = false;
169 968 : if (IsSet(supported_opts, SUBOPT_FAILOVER))
170 698 : opts->failover = false;
171 968 : if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
172 698 : opts->retaindeadtuples = false;
173 968 : if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
174 698 : opts->maxretention = 0;
175 968 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
176 698 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
177 :
178 : /* Parse options */
179 1918 : foreach(lc, stmt_options)
180 : {
181 1016 : DefElem *defel = (DefElem *) lfirst(lc);
182 :
183 1016 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
184 592 : strcmp(defel->defname, "connect") == 0)
185 : {
186 190 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
187 0 : errorConflictingDefElem(defel, pstate);
188 :
189 190 : opts->specified_opts |= SUBOPT_CONNECT;
190 190 : opts->connect = defGetBoolean(defel);
191 : }
192 826 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
193 506 : strcmp(defel->defname, "enabled") == 0)
194 : {
195 142 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
196 0 : errorConflictingDefElem(defel, pstate);
197 :
198 142 : opts->specified_opts |= SUBOPT_ENABLED;
199 142 : opts->enabled = defGetBoolean(defel);
200 : }
201 684 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
202 364 : strcmp(defel->defname, "create_slot") == 0)
203 : {
204 40 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
205 0 : errorConflictingDefElem(defel, pstate);
206 :
207 40 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
208 40 : opts->create_slot = defGetBoolean(defel);
209 : }
210 644 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
211 546 : strcmp(defel->defname, "slot_name") == 0)
212 : {
213 156 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
214 0 : errorConflictingDefElem(defel, pstate);
215 :
216 156 : opts->specified_opts |= SUBOPT_SLOT_NAME;
217 156 : opts->slot_name = defGetString(defel);
218 :
219 : /* Setting slot_name = NONE is treated as no slot name. */
220 156 : if (strcmp(opts->slot_name, "none") == 0)
221 122 : opts->slot_name = NULL;
222 : else
223 34 : ReplicationSlotValidateName(opts->slot_name, false, ERROR);
224 : }
225 488 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
226 320 : strcmp(defel->defname, "copy_data") == 0)
227 : {
228 56 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
229 0 : errorConflictingDefElem(defel, pstate);
230 :
231 56 : opts->specified_opts |= SUBOPT_COPY_DATA;
232 56 : opts->copy_data = defGetBoolean(defel);
233 : }
234 432 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
235 342 : strcmp(defel->defname, "synchronous_commit") == 0)
236 : {
237 12 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
238 0 : errorConflictingDefElem(defel, pstate);
239 :
240 12 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
241 12 : opts->synchronous_commit = defGetString(defel);
242 :
243 : /* Test if the given value is valid for synchronous_commit GUC. */
244 12 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
245 : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
246 : false, 0, false);
247 : }
248 420 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
249 66 : strcmp(defel->defname, "refresh") == 0)
250 : {
251 66 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
252 0 : errorConflictingDefElem(defel, pstate);
253 :
254 66 : opts->specified_opts |= SUBOPT_REFRESH;
255 66 : opts->refresh = defGetBoolean(defel);
256 : }
257 354 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
258 330 : strcmp(defel->defname, "binary") == 0)
259 : {
260 32 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
261 0 : errorConflictingDefElem(defel, pstate);
262 :
263 32 : opts->specified_opts |= SUBOPT_BINARY;
264 32 : opts->binary = defGetBoolean(defel);
265 : }
266 322 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
267 298 : strcmp(defel->defname, "streaming") == 0)
268 : {
269 74 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
270 0 : errorConflictingDefElem(defel, pstate);
271 :
272 74 : opts->specified_opts |= SUBOPT_STREAMING;
273 74 : opts->streaming = defGetStreamingMode(defel);
274 : }
275 248 : else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
276 224 : strcmp(defel->defname, "two_phase") == 0)
277 : {
278 42 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
279 0 : errorConflictingDefElem(defel, pstate);
280 :
281 42 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
282 42 : opts->twophase = defGetBoolean(defel);
283 : }
284 206 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
285 182 : strcmp(defel->defname, "disable_on_error") == 0)
286 : {
287 20 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
288 0 : errorConflictingDefElem(defel, pstate);
289 :
290 20 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
291 20 : opts->disableonerr = defGetBoolean(defel);
292 : }
293 186 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
294 162 : strcmp(defel->defname, "password_required") == 0)
295 : {
296 24 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
297 0 : errorConflictingDefElem(defel, pstate);
298 :
299 24 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
300 24 : opts->passwordrequired = defGetBoolean(defel);
301 : }
302 162 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
303 138 : strcmp(defel->defname, "run_as_owner") == 0)
304 : {
305 18 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
306 0 : errorConflictingDefElem(defel, pstate);
307 :
308 18 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
309 18 : opts->runasowner = defGetBoolean(defel);
310 : }
311 144 : else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
312 120 : strcmp(defel->defname, "failover") == 0)
313 : {
314 26 : if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
315 0 : errorConflictingDefElem(defel, pstate);
316 :
317 26 : opts->specified_opts |= SUBOPT_FAILOVER;
318 26 : opts->failover = defGetBoolean(defel);
319 : }
320 118 : else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
321 94 : strcmp(defel->defname, "retain_dead_tuples") == 0)
322 : {
323 24 : if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
324 0 : errorConflictingDefElem(defel, pstate);
325 :
326 24 : opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
327 24 : opts->retaindeadtuples = defGetBoolean(defel);
328 : }
329 94 : else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
330 70 : strcmp(defel->defname, "max_retention_duration") == 0)
331 : {
332 22 : if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
333 0 : errorConflictingDefElem(defel, pstate);
334 :
335 22 : opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
336 22 : opts->maxretention = defGetInt32(defel);
337 : }
338 72 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
339 48 : strcmp(defel->defname, "origin") == 0)
340 : {
341 42 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
342 0 : errorConflictingDefElem(defel, pstate);
343 :
344 42 : opts->specified_opts |= SUBOPT_ORIGIN;
345 42 : pfree(opts->origin);
346 :
347 : /*
348 : * Even though the "origin" parameter allows only "none" and "any"
349 : * values, it is implemented as a string type so that the
350 : * parameter can be extended in future versions to support
351 : * filtering using origin names specified by the user.
352 : */
353 42 : opts->origin = defGetString(defel);
354 :
355 58 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
356 16 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
357 6 : ereport(ERROR,
358 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
359 : errmsg("unrecognized origin value: \"%s\"", opts->origin));
360 : }
361 30 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
362 24 : strcmp(defel->defname, "lsn") == 0)
363 18 : {
364 24 : char *lsn_str = defGetString(defel);
365 : XLogRecPtr lsn;
366 :
367 24 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
368 0 : errorConflictingDefElem(defel, pstate);
369 :
370 : /* Setting lsn = NONE is treated as resetting LSN */
371 24 : if (strcmp(lsn_str, "none") == 0)
372 6 : lsn = InvalidXLogRecPtr;
373 : else
374 : {
375 : /* Parse the argument as LSN */
376 18 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
377 : CStringGetDatum(lsn_str)));
378 :
379 18 : if (XLogRecPtrIsInvalid(lsn))
380 6 : ereport(ERROR,
381 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
382 : errmsg("invalid WAL location (LSN): %s", lsn_str)));
383 : }
384 :
385 18 : opts->specified_opts |= SUBOPT_LSN;
386 18 : opts->lsn = lsn;
387 : }
388 : else
389 6 : ereport(ERROR,
390 : (errcode(ERRCODE_SYNTAX_ERROR),
391 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
392 : }
393 :
394 : /*
395 : * We've been explicitly asked to not connect, that requires some
396 : * additional processing.
397 : */
398 902 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
399 : {
400 : /* Check for incompatible options from the user. */
401 148 : if (opts->enabled &&
402 148 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
403 6 : ereport(ERROR,
404 : (errcode(ERRCODE_SYNTAX_ERROR),
405 : /*- translator: both %s are strings of the form "option = value" */
406 : errmsg("%s and %s are mutually exclusive options",
407 : "connect = false", "enabled = true")));
408 :
409 142 : if (opts->create_slot &&
410 136 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
411 6 : ereport(ERROR,
412 : (errcode(ERRCODE_SYNTAX_ERROR),
413 : errmsg("%s and %s are mutually exclusive options",
414 : "connect = false", "create_slot = true")));
415 :
416 136 : if (opts->copy_data &&
417 130 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
418 6 : ereport(ERROR,
419 : (errcode(ERRCODE_SYNTAX_ERROR),
420 : errmsg("%s and %s are mutually exclusive options",
421 : "connect = false", "copy_data = true")));
422 :
423 : /* Change the defaults of other options. */
424 130 : opts->enabled = false;
425 130 : opts->create_slot = false;
426 130 : opts->copy_data = false;
427 : }
428 :
429 : /*
430 : * Do additional checking for disallowed combination when slot_name = NONE
431 : * was used.
432 : */
433 884 : if (!opts->slot_name &&
434 856 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
435 : {
436 116 : if (opts->enabled)
437 : {
438 18 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
439 6 : ereport(ERROR,
440 : (errcode(ERRCODE_SYNTAX_ERROR),
441 : /*- translator: both %s are strings of the form "option = value" */
442 : errmsg("%s and %s are mutually exclusive options",
443 : "slot_name = NONE", "enabled = true")));
444 : else
445 12 : ereport(ERROR,
446 : (errcode(ERRCODE_SYNTAX_ERROR),
447 : /*- translator: both %s are strings of the form "option = value" */
448 : errmsg("subscription with %s must also set %s",
449 : "slot_name = NONE", "enabled = false")));
450 : }
451 :
452 98 : if (opts->create_slot)
453 : {
454 12 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
455 6 : ereport(ERROR,
456 : (errcode(ERRCODE_SYNTAX_ERROR),
457 : /*- translator: both %s are strings of the form "option = value" */
458 : errmsg("%s and %s are mutually exclusive options",
459 : "slot_name = NONE", "create_slot = true")));
460 : else
461 6 : ereport(ERROR,
462 : (errcode(ERRCODE_SYNTAX_ERROR),
463 : /*- translator: both %s are strings of the form "option = value" */
464 : errmsg("subscription with %s must also set %s",
465 : "slot_name = NONE", "create_slot = false")));
466 : }
467 : }
468 854 : }
469 :
470 : /*
471 : * Check that the specified publications are present on the publisher.
472 : */
473 : static void
474 254 : check_publications(WalReceiverConn *wrconn, List *publications)
475 : {
476 : WalRcvExecResult *res;
477 : StringInfo cmd;
478 : TupleTableSlot *slot;
479 254 : List *publicationsCopy = NIL;
480 254 : Oid tableRow[1] = {TEXTOID};
481 :
482 254 : cmd = makeStringInfo();
483 254 : appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
484 : " pg_catalog.pg_publication t WHERE\n"
485 : " t.pubname IN (");
486 254 : GetPublicationsStr(publications, cmd, true);
487 254 : appendStringInfoChar(cmd, ')');
488 :
489 254 : res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
490 254 : destroyStringInfo(cmd);
491 :
492 254 : if (res->status != WALRCV_OK_TUPLES)
493 0 : ereport(ERROR,
494 : errmsg("could not receive list of publications from the publisher: %s",
495 : res->err));
496 :
497 254 : publicationsCopy = list_copy(publications);
498 :
499 : /* Process publication(s). */
500 254 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
501 560 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
502 : {
503 : char *pubname;
504 : bool isnull;
505 :
506 306 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
507 : Assert(!isnull);
508 :
509 : /* Delete the publication present in publisher from the list. */
510 306 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
511 306 : ExecClearTuple(slot);
512 : }
513 :
514 254 : ExecDropSingleTupleTableSlot(slot);
515 :
516 254 : walrcv_clear_result(res);
517 :
518 254 : if (list_length(publicationsCopy))
519 : {
520 : /* Prepare the list of non-existent publication(s) for error message. */
521 8 : StringInfo pubnames = makeStringInfo();
522 :
523 8 : GetPublicationsStr(publicationsCopy, pubnames, false);
524 8 : ereport(WARNING,
525 : errcode(ERRCODE_UNDEFINED_OBJECT),
526 : errmsg_plural("publication %s does not exist on the publisher",
527 : "publications %s do not exist on the publisher",
528 : list_length(publicationsCopy),
529 : pubnames->data));
530 : }
531 254 : }
532 :
533 : /*
534 : * Auxiliary function to build a text array out of a list of String nodes.
535 : */
536 : static Datum
537 392 : publicationListToArray(List *publist)
538 : {
539 : ArrayType *arr;
540 : Datum *datums;
541 : MemoryContext memcxt;
542 : MemoryContext oldcxt;
543 :
544 : /* Create memory context for temporary allocations. */
545 392 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
546 : "publicationListToArray to array",
547 : ALLOCSET_DEFAULT_SIZES);
548 392 : oldcxt = MemoryContextSwitchTo(memcxt);
549 :
550 392 : datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
551 :
552 392 : check_duplicates_in_publist(publist, datums);
553 :
554 386 : MemoryContextSwitchTo(oldcxt);
555 :
556 386 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
557 :
558 386 : MemoryContextDelete(memcxt);
559 :
560 386 : return PointerGetDatum(arr);
561 : }
562 :
563 : /*
564 : * Create new subscription.
565 : */
566 : ObjectAddress
567 480 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
568 : bool isTopLevel)
569 : {
570 : Relation rel;
571 : ObjectAddress myself;
572 : Oid subid;
573 : bool nulls[Natts_pg_subscription];
574 : Datum values[Natts_pg_subscription];
575 480 : Oid owner = GetUserId();
576 : HeapTuple tup;
577 : char *conninfo;
578 : char originname[NAMEDATALEN];
579 : List *publications;
580 : bits32 supported_opts;
581 480 : SubOpts opts = {0};
582 : AclResult aclresult;
583 :
584 : /*
585 : * Parse and check options.
586 : *
587 : * Connection and publication should not be specified here.
588 : */
589 480 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
590 : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
591 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
592 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
593 : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
594 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
595 : SUBOPT_RETAIN_DEAD_TUPLES |
596 : SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
597 480 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
598 :
599 : /*
600 : * Since creating a replication slot is not transactional, rolling back
601 : * the transaction leaves the created replication slot. So we cannot run
602 : * CREATE SUBSCRIPTION inside a transaction block if creating a
603 : * replication slot.
604 : */
605 390 : if (opts.create_slot)
606 250 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
607 :
608 : /*
609 : * We don't want to allow unprivileged users to be able to trigger
610 : * attempts to access arbitrary network destinations, so require the user
611 : * to have been specifically authorized to create subscriptions.
612 : */
613 384 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
614 6 : ereport(ERROR,
615 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
616 : errmsg("permission denied to create subscription"),
617 : errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
618 : "pg_create_subscription")));
619 :
620 : /*
621 : * Since a subscription is a database object, we also check for CREATE
622 : * permission on the database.
623 : */
624 378 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
625 : owner, ACL_CREATE);
626 378 : if (aclresult != ACLCHECK_OK)
627 12 : aclcheck_error(aclresult, OBJECT_DATABASE,
628 6 : get_database_name(MyDatabaseId));
629 :
630 : /*
631 : * Non-superusers are required to set a password for authentication, and
632 : * that password must be used by the target server, but the superuser can
633 : * exempt a subscription from this requirement.
634 : */
635 372 : if (!opts.passwordrequired && !superuser_arg(owner))
636 6 : ereport(ERROR,
637 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
638 : errmsg("password_required=false is superuser-only"),
639 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
640 :
641 : /*
642 : * If built with appropriate switch, whine when regression-testing
643 : * conventions for subscription names are violated.
644 : */
645 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
646 : if (strncmp(stmt->subname, "regress_", 8) != 0)
647 : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
648 : #endif
649 :
650 366 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
651 :
652 : /* Check if name is used */
653 366 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
654 : ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
655 366 : if (OidIsValid(subid))
656 : {
657 6 : ereport(ERROR,
658 : (errcode(ERRCODE_DUPLICATE_OBJECT),
659 : errmsg("subscription \"%s\" already exists",
660 : stmt->subname)));
661 : }
662 :
663 : /*
664 : * Ensure that system configuration paramters are set appropriately to
665 : * support retain_dead_tuples and max_retention_duration.
666 : */
667 360 : CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
668 360 : opts.retaindeadtuples, opts.retaindeadtuples,
669 360 : (opts.maxretention > 0));
670 :
671 360 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
672 318 : opts.slot_name == NULL)
673 318 : opts.slot_name = stmt->subname;
674 :
675 : /* The default for synchronous_commit of subscriptions is off. */
676 360 : if (opts.synchronous_commit == NULL)
677 360 : opts.synchronous_commit = "off";
678 :
679 360 : conninfo = stmt->conninfo;
680 360 : publications = stmt->publication;
681 :
682 : /* Load the library providing us libpq calls. */
683 360 : load_file("libpqwalreceiver", false);
684 :
685 : /* Check the connection info string. */
686 360 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
687 :
688 : /* Everything ok, form a new tuple. */
689 342 : memset(values, 0, sizeof(values));
690 342 : memset(nulls, false, sizeof(nulls));
691 :
692 342 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
693 : Anum_pg_subscription_oid);
694 342 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
695 342 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
696 342 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
697 342 : values[Anum_pg_subscription_subname - 1] =
698 342 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
699 342 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
700 342 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
701 342 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
702 342 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
703 342 : values[Anum_pg_subscription_subtwophasestate - 1] =
704 342 : CharGetDatum(opts.twophase ?
705 : LOGICALREP_TWOPHASE_STATE_PENDING :
706 : LOGICALREP_TWOPHASE_STATE_DISABLED);
707 342 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
708 342 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
709 342 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
710 342 : values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
711 342 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
712 342 : BoolGetDatum(opts.retaindeadtuples);
713 342 : values[Anum_pg_subscription_submaxretention - 1] =
714 342 : Int32GetDatum(opts.maxretention);
715 342 : values[Anum_pg_subscription_subretentionactive - 1] =
716 342 : Int32GetDatum(opts.retaindeadtuples);
717 342 : values[Anum_pg_subscription_subconninfo - 1] =
718 342 : CStringGetTextDatum(conninfo);
719 342 : if (opts.slot_name)
720 322 : values[Anum_pg_subscription_subslotname - 1] =
721 322 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
722 : else
723 20 : nulls[Anum_pg_subscription_subslotname - 1] = true;
724 342 : values[Anum_pg_subscription_subsynccommit - 1] =
725 342 : CStringGetTextDatum(opts.synchronous_commit);
726 336 : values[Anum_pg_subscription_subpublications - 1] =
727 342 : publicationListToArray(publications);
728 336 : values[Anum_pg_subscription_suborigin - 1] =
729 336 : CStringGetTextDatum(opts.origin);
730 :
731 336 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
732 :
733 : /* Insert tuple into catalog. */
734 336 : CatalogTupleInsert(rel, tup);
735 336 : heap_freetuple(tup);
736 :
737 336 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
738 :
739 336 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
740 336 : replorigin_create(originname);
741 :
742 : /*
743 : * Connect to remote side to execute requested commands and fetch table
744 : * info.
745 : */
746 336 : if (opts.connect)
747 : {
748 : char *err;
749 : WalReceiverConn *wrconn;
750 : List *tables;
751 : ListCell *lc;
752 : char table_state;
753 : bool must_use_password;
754 :
755 : /* Try to connect to the publisher. */
756 242 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
757 242 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
758 : stmt->subname, &err);
759 242 : if (!wrconn)
760 6 : ereport(ERROR,
761 : (errcode(ERRCODE_CONNECTION_FAILURE),
762 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
763 : stmt->subname, err)));
764 :
765 236 : PG_TRY();
766 : {
767 236 : check_publications(wrconn, publications);
768 236 : check_publications_origin(wrconn, publications, opts.copy_data,
769 236 : opts.retaindeadtuples, opts.origin,
770 : NULL, 0, stmt->subname);
771 :
772 236 : if (opts.retaindeadtuples)
773 6 : check_pub_dead_tuple_retention(wrconn);
774 :
775 : /*
776 : * Set sync state based on if we were asked to do data copy or
777 : * not.
778 : */
779 236 : table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
780 :
781 : /*
782 : * Get the table list from publisher and build local table status
783 : * info.
784 : */
785 236 : tables = fetch_table_list(wrconn, publications);
786 592 : foreach(lc, tables)
787 : {
788 358 : RangeVar *rv = (RangeVar *) lfirst(lc);
789 : Oid relid;
790 :
791 358 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
792 :
793 : /* Check for supported relkind. */
794 358 : CheckSubscriptionRelkind(get_rel_relkind(relid),
795 358 : rv->schemaname, rv->relname);
796 :
797 358 : AddSubscriptionRelState(subid, relid, table_state,
798 : InvalidXLogRecPtr, true);
799 : }
800 :
801 : /*
802 : * If requested, create permanent slot for the subscription. We
803 : * won't use the initial snapshot for anything, so no need to
804 : * export it.
805 : */
806 234 : if (opts.create_slot)
807 : {
808 224 : bool twophase_enabled = false;
809 :
810 : Assert(opts.slot_name);
811 :
812 : /*
813 : * Even if two_phase is set, don't create the slot with
814 : * two-phase enabled. Will enable it once all the tables are
815 : * synced and ready. This avoids race-conditions like prepared
816 : * transactions being skipped due to changes not being applied
817 : * due to checks in should_apply_changes_for_rel() when
818 : * tablesync for the corresponding tables are in progress. See
819 : * comments atop worker.c.
820 : *
821 : * Note that if tables were specified but copy_data is false
822 : * then it is safe to enable two_phase up-front because those
823 : * tables are already initially in READY state. When the
824 : * subscription has no tables, we leave the twophase state as
825 : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
826 : * PUBLICATION to work.
827 : */
828 224 : if (opts.twophase && !opts.copy_data && tables != NIL)
829 2 : twophase_enabled = true;
830 :
831 224 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
832 : opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
833 :
834 224 : if (twophase_enabled)
835 2 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
836 :
837 224 : ereport(NOTICE,
838 : (errmsg("created replication slot \"%s\" on publisher",
839 : opts.slot_name)));
840 : }
841 : }
842 2 : PG_FINALLY();
843 : {
844 236 : walrcv_disconnect(wrconn);
845 : }
846 236 : PG_END_TRY();
847 : }
848 : else
849 94 : ereport(WARNING,
850 : (errmsg("subscription was created, but is not connected"),
851 : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
852 :
853 328 : table_close(rel, RowExclusiveLock);
854 :
855 328 : pgstat_create_subscription(subid);
856 :
857 : /*
858 : * Notify the launcher to start the apply worker if the subscription is
859 : * enabled, or to create the conflict detection slot if retain_dead_tuples
860 : * is enabled.
861 : *
862 : * Creating the conflict detection slot is essential even when the
863 : * subscription is not enabled. This ensures that dead tuples are
864 : * retained, which is necessary for accurately identifying the type of
865 : * conflict during replication.
866 : */
867 328 : if (opts.enabled || opts.retaindeadtuples)
868 224 : ApplyLauncherWakeupAtCommit();
869 :
870 328 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
871 :
872 328 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
873 :
874 328 : return myself;
875 : }
876 :
877 : static void
878 70 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
879 : List *validate_publications)
880 : {
881 : char *err;
882 : List *pubrel_names;
883 : List *subrel_states;
884 : Oid *subrel_local_oids;
885 : Oid *pubrel_local_oids;
886 : ListCell *lc;
887 : int off;
888 : int remove_rel_len;
889 : int subrel_count;
890 70 : Relation rel = NULL;
891 : typedef struct SubRemoveRels
892 : {
893 : Oid relid;
894 : char state;
895 : } SubRemoveRels;
896 : SubRemoveRels *sub_remove_rels;
897 : WalReceiverConn *wrconn;
898 : bool must_use_password;
899 :
900 : /* Load the library providing us libpq calls. */
901 70 : load_file("libpqwalreceiver", false);
902 :
903 : /* Try to connect to the publisher. */
904 70 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
905 70 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
906 : sub->name, &err);
907 68 : if (!wrconn)
908 0 : ereport(ERROR,
909 : (errcode(ERRCODE_CONNECTION_FAILURE),
910 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
911 : sub->name, err)));
912 :
913 68 : PG_TRY();
914 : {
915 68 : if (validate_publications)
916 18 : check_publications(wrconn, validate_publications);
917 :
918 : /* Get the table list from publisher. */
919 68 : pubrel_names = fetch_table_list(wrconn, sub->publications);
920 :
921 : /* Get local table list. */
922 68 : subrel_states = GetSubscriptionRelations(sub->oid, false);
923 68 : subrel_count = list_length(subrel_states);
924 :
925 : /*
926 : * Build qsorted array of local table oids for faster lookup. This can
927 : * potentially contain all tables in the database so speed of lookup
928 : * is important.
929 : */
930 68 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
931 68 : off = 0;
932 244 : foreach(lc, subrel_states)
933 : {
934 176 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
935 :
936 176 : subrel_local_oids[off++] = relstate->relid;
937 : }
938 68 : qsort(subrel_local_oids, subrel_count,
939 : sizeof(Oid), oid_cmp);
940 :
941 68 : check_publications_origin(wrconn, sub->publications, copy_data,
942 68 : sub->retaindeadtuples, sub->origin,
943 : subrel_local_oids, subrel_count, sub->name);
944 :
945 : /*
946 : * Rels that we want to remove from subscription and drop any slots
947 : * and origins corresponding to them.
948 : */
949 68 : sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
950 :
951 : /*
952 : * Walk over the remote tables and try to match them to locally known
953 : * tables. If the table is not known locally create a new state for
954 : * it.
955 : *
956 : * Also builds array of local oids of remote tables for the next step.
957 : */
958 68 : off = 0;
959 68 : pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
960 :
961 248 : foreach(lc, pubrel_names)
962 : {
963 180 : RangeVar *rv = (RangeVar *) lfirst(lc);
964 : Oid relid;
965 :
966 180 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
967 :
968 : /* Check for supported relkind. */
969 180 : CheckSubscriptionRelkind(get_rel_relkind(relid),
970 180 : rv->schemaname, rv->relname);
971 :
972 180 : pubrel_local_oids[off++] = relid;
973 :
974 180 : if (!bsearch(&relid, subrel_local_oids,
975 : subrel_count, sizeof(Oid), oid_cmp))
976 : {
977 46 : AddSubscriptionRelState(sub->oid, relid,
978 : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
979 : InvalidXLogRecPtr, true);
980 46 : ereport(DEBUG1,
981 : (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
982 : rv->schemaname, rv->relname, sub->name)));
983 : }
984 : }
985 :
986 : /*
987 : * Next remove state for tables we should not care about anymore using
988 : * the data we collected above
989 : */
990 68 : qsort(pubrel_local_oids, list_length(pubrel_names),
991 : sizeof(Oid), oid_cmp);
992 :
993 68 : remove_rel_len = 0;
994 244 : for (off = 0; off < subrel_count; off++)
995 : {
996 176 : Oid relid = subrel_local_oids[off];
997 :
998 176 : if (!bsearch(&relid, pubrel_local_oids,
999 176 : list_length(pubrel_names), sizeof(Oid), oid_cmp))
1000 : {
1001 : char state;
1002 : XLogRecPtr statelsn;
1003 :
1004 : /*
1005 : * Lock pg_subscription_rel with AccessExclusiveLock to
1006 : * prevent any race conditions with the apply worker
1007 : * re-launching workers at the same time this code is trying
1008 : * to remove those tables.
1009 : *
1010 : * Even if new worker for this particular rel is restarted it
1011 : * won't be able to make any progress as we hold exclusive
1012 : * lock on pg_subscription_rel till the transaction end. It
1013 : * will simply exit as there is no corresponding rel entry.
1014 : *
1015 : * This locking also ensures that the state of rels won't
1016 : * change till we are done with this refresh operation.
1017 : */
1018 42 : if (!rel)
1019 18 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1020 :
1021 : /* Last known rel state. */
1022 42 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1023 :
1024 42 : sub_remove_rels[remove_rel_len].relid = relid;
1025 42 : sub_remove_rels[remove_rel_len++].state = state;
1026 :
1027 42 : RemoveSubscriptionRel(sub->oid, relid);
1028 :
1029 42 : logicalrep_worker_stop(sub->oid, relid);
1030 :
1031 : /*
1032 : * For READY state, we would have already dropped the
1033 : * tablesync origin.
1034 : */
1035 42 : if (state != SUBREL_STATE_READY)
1036 : {
1037 : char originname[NAMEDATALEN];
1038 :
1039 : /*
1040 : * Drop the tablesync's origin tracking if exists.
1041 : *
1042 : * It is possible that the origin is not yet created for
1043 : * tablesync worker, this can happen for the states before
1044 : * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
1045 : * apply worker can also concurrently try to drop the
1046 : * origin and by this time the origin might be already
1047 : * removed. For these reasons, passing missing_ok = true.
1048 : */
1049 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1050 : sizeof(originname));
1051 0 : replorigin_drop_by_name(originname, true, false);
1052 : }
1053 :
1054 42 : ereport(DEBUG1,
1055 : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1056 : get_namespace_name(get_rel_namespace(relid)),
1057 : get_rel_name(relid),
1058 : sub->name)));
1059 : }
1060 : }
1061 :
1062 : /*
1063 : * Drop the tablesync slots associated with removed tables. This has
1064 : * to be at the end because otherwise if there is an error while doing
1065 : * the database operations we won't be able to rollback dropped slots.
1066 : */
1067 110 : for (off = 0; off < remove_rel_len; off++)
1068 : {
1069 42 : if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
1070 0 : sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
1071 : {
1072 0 : char syncslotname[NAMEDATALEN] = {0};
1073 :
1074 : /*
1075 : * For READY/SYNCDONE states we know the tablesync slot has
1076 : * already been dropped by the tablesync worker.
1077 : *
1078 : * For other states, there is no certainty, maybe the slot
1079 : * does not exist yet. Also, if we fail after removing some of
1080 : * the slots, next time, it will again try to drop already
1081 : * dropped slots and fail. For these reasons, we allow
1082 : * missing_ok = true for the drop.
1083 : */
1084 0 : ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
1085 : syncslotname, sizeof(syncslotname));
1086 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1087 : }
1088 : }
1089 : }
1090 0 : PG_FINALLY();
1091 : {
1092 68 : walrcv_disconnect(wrconn);
1093 : }
1094 68 : PG_END_TRY();
1095 :
1096 68 : if (rel)
1097 18 : table_close(rel, NoLock);
1098 68 : }
1099 :
1100 : /*
1101 : * Common checks for altering failover, two_phase, and retain_dead_tuples
1102 : * options.
1103 : */
1104 : static void
1105 26 : CheckAlterSubOption(Subscription *sub, const char *option,
1106 : bool slot_needs_update, bool isTopLevel)
1107 : {
1108 : Assert(strcmp(option, "failover") == 0 ||
1109 : strcmp(option, "two_phase") == 0 ||
1110 : strcmp(option, "retain_dead_tuples") == 0);
1111 :
1112 : /*
1113 : * Altering the retain_dead_tuples option does not update the slot on the
1114 : * publisher.
1115 : */
1116 : Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1117 :
1118 : /*
1119 : * Do not allow changing the option if the subscription is enabled. This
1120 : * is because both failover and two_phase options of the slot on the
1121 : * publisher cannot be modified if the slot is currently acquired by the
1122 : * existing walsender.
1123 : *
1124 : * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1125 : * the publisher by the existing walsender, so we could have allowed that
1126 : * even when the subscription is enabled. But we kept this restriction for
1127 : * the sake of consistency and simplicity.
1128 : *
1129 : * Additionally, do not allow changing the retain_dead_tuples option when
1130 : * the subscription is enabled to prevent race conditions arising from the
1131 : * new option value being acknowledged asynchronously by the launcher and
1132 : * apply workers.
1133 : *
1134 : * Without the restriction, a race condition may arise when a user
1135 : * disables and immediately re-enables the retain_dead_tuples option. In
1136 : * this case, the launcher might drop the slot upon noticing the disabled
1137 : * action, while the apply worker may keep maintaining
1138 : * oldest_nonremovable_xid without noticing the option change. During this
1139 : * period, a transaction ID wraparound could falsely make this ID appear
1140 : * as if it originates from the future w.r.t the transaction ID stored in
1141 : * the slot maintained by launcher.
1142 : *
1143 : * Similarly, if the user enables retain_dead_tuples concurrently with the
1144 : * launcher starting the worker, the apply worker may start calculating
1145 : * oldest_nonremovable_xid before the launcher notices the enable action.
1146 : * Consequently, the launcher may update slot.xmin to a newer value than
1147 : * that maintained by the worker. In subsequent cycles, upon integrating
1148 : * the worker's oldest_nonremovable_xid, the launcher might detect a
1149 : * retreat in the calculated xmin, necessitating additional handling.
1150 : *
1151 : * XXX To address the above race conditions, we can define
1152 : * oldest_nonremovable_xid as FullTransactionID and adds the check to
1153 : * disallow retreating the conflict slot's xmin. For now, we kept the
1154 : * implementation simple by disallowing change to the retain_dead_tuples,
1155 : * but in the future we can change this after some more analysis.
1156 : *
1157 : * Note that we could restrict only the enabling of retain_dead_tuples to
1158 : * avoid the race conditions described above, but we maintain the
1159 : * restriction for both enable and disable operations for the sake of
1160 : * consistency.
1161 : */
1162 26 : if (sub->enabled)
1163 4 : ereport(ERROR,
1164 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1165 : errmsg("cannot set option \"%s\" for enabled subscription",
1166 : option)));
1167 :
1168 22 : if (slot_needs_update)
1169 : {
1170 : StringInfoData cmd;
1171 :
1172 : /*
1173 : * A valid slot must be associated with the subscription for us to
1174 : * modify any of the slot's properties.
1175 : */
1176 16 : if (!sub->slotname)
1177 0 : ereport(ERROR,
1178 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1179 : errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1180 : option)));
1181 :
1182 : /* The changed option of the slot can't be rolled back. */
1183 16 : initStringInfo(&cmd);
1184 16 : appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1185 :
1186 16 : PreventInTransactionBlock(isTopLevel, cmd.data);
1187 10 : pfree(cmd.data);
1188 : }
1189 16 : }
1190 :
1191 : /*
1192 : * Alter the existing subscription.
1193 : */
1194 : ObjectAddress
1195 520 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1196 : bool isTopLevel)
1197 : {
1198 : Relation rel;
1199 : ObjectAddress myself;
1200 : bool nulls[Natts_pg_subscription];
1201 : bool replaces[Natts_pg_subscription];
1202 : Datum values[Natts_pg_subscription];
1203 : HeapTuple tup;
1204 : Oid subid;
1205 520 : bool update_tuple = false;
1206 520 : bool update_failover = false;
1207 520 : bool update_two_phase = false;
1208 520 : bool check_pub_rdt = false;
1209 : bool retain_dead_tuples;
1210 : int max_retention;
1211 : bool retention_active;
1212 : char *origin;
1213 : Subscription *sub;
1214 : Form_pg_subscription form;
1215 : bits32 supported_opts;
1216 520 : SubOpts opts = {0};
1217 :
1218 520 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1219 :
1220 : /* Fetch the existing tuple. */
1221 520 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
1222 : CStringGetDatum(stmt->subname));
1223 :
1224 520 : if (!HeapTupleIsValid(tup))
1225 6 : ereport(ERROR,
1226 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1227 : errmsg("subscription \"%s\" does not exist",
1228 : stmt->subname)));
1229 :
1230 514 : form = (Form_pg_subscription) GETSTRUCT(tup);
1231 514 : subid = form->oid;
1232 :
1233 : /* must be owner */
1234 514 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1235 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1236 0 : stmt->subname);
1237 :
1238 514 : sub = GetSubscription(subid, false);
1239 :
1240 514 : retain_dead_tuples = sub->retaindeadtuples;
1241 514 : origin = sub->origin;
1242 514 : max_retention = sub->maxretention;
1243 514 : retention_active = sub->retentionactive;
1244 :
1245 : /*
1246 : * Don't allow non-superuser modification of a subscription with
1247 : * password_required=false.
1248 : */
1249 514 : if (!sub->passwordrequired && !superuser())
1250 0 : ereport(ERROR,
1251 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1252 : errmsg("password_required=false is superuser-only"),
1253 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1254 :
1255 : /* Lock the subscription so nobody else can do anything with it. */
1256 514 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1257 :
1258 : /* Form a new tuple. */
1259 514 : memset(values, 0, sizeof(values));
1260 514 : memset(nulls, false, sizeof(nulls));
1261 514 : memset(replaces, false, sizeof(replaces));
1262 :
1263 514 : switch (stmt->kind)
1264 : {
1265 218 : case ALTER_SUBSCRIPTION_OPTIONS:
1266 : {
1267 218 : supported_opts = (SUBOPT_SLOT_NAME |
1268 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1269 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
1270 : SUBOPT_DISABLE_ON_ERR |
1271 : SUBOPT_PASSWORD_REQUIRED |
1272 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1273 : SUBOPT_RETAIN_DEAD_TUPLES |
1274 : SUBOPT_MAX_RETENTION_DURATION |
1275 : SUBOPT_ORIGIN);
1276 :
1277 218 : parse_subscription_options(pstate, stmt->options,
1278 : supported_opts, &opts);
1279 :
1280 200 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1281 : {
1282 : /*
1283 : * The subscription must be disabled to allow slot_name as
1284 : * 'none', otherwise, the apply worker will repeatedly try
1285 : * to stream the data using that slot_name which neither
1286 : * exists on the publisher nor the user will be allowed to
1287 : * create it.
1288 : */
1289 72 : if (sub->enabled && !opts.slot_name)
1290 0 : ereport(ERROR,
1291 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1292 : errmsg("cannot set %s for enabled subscription",
1293 : "slot_name = NONE")));
1294 :
1295 72 : if (opts.slot_name)
1296 6 : values[Anum_pg_subscription_subslotname - 1] =
1297 6 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1298 : else
1299 66 : nulls[Anum_pg_subscription_subslotname - 1] = true;
1300 72 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1301 : }
1302 :
1303 200 : if (opts.synchronous_commit)
1304 : {
1305 6 : values[Anum_pg_subscription_subsynccommit - 1] =
1306 6 : CStringGetTextDatum(opts.synchronous_commit);
1307 6 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1308 : }
1309 :
1310 200 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1311 : {
1312 18 : values[Anum_pg_subscription_subbinary - 1] =
1313 18 : BoolGetDatum(opts.binary);
1314 18 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1315 : }
1316 :
1317 200 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1318 : {
1319 30 : values[Anum_pg_subscription_substream - 1] =
1320 30 : CharGetDatum(opts.streaming);
1321 30 : replaces[Anum_pg_subscription_substream - 1] = true;
1322 : }
1323 :
1324 200 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1325 : {
1326 : values[Anum_pg_subscription_subdisableonerr - 1]
1327 6 : = BoolGetDatum(opts.disableonerr);
1328 : replaces[Anum_pg_subscription_subdisableonerr - 1]
1329 6 : = true;
1330 : }
1331 :
1332 200 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1333 : {
1334 : /* Non-superuser may not disable password_required. */
1335 12 : if (!opts.passwordrequired && !superuser())
1336 0 : ereport(ERROR,
1337 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1338 : errmsg("password_required=false is superuser-only"),
1339 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1340 :
1341 : values[Anum_pg_subscription_subpasswordrequired - 1]
1342 12 : = BoolGetDatum(opts.passwordrequired);
1343 : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1344 12 : = true;
1345 : }
1346 :
1347 200 : if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1348 : {
1349 14 : values[Anum_pg_subscription_subrunasowner - 1] =
1350 14 : BoolGetDatum(opts.runasowner);
1351 14 : replaces[Anum_pg_subscription_subrunasowner - 1] = true;
1352 : }
1353 :
1354 200 : if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1355 : {
1356 : /*
1357 : * We need to update both the slot and the subscription
1358 : * for the two_phase option. We can enable the two_phase
1359 : * option for a slot only once the initial data
1360 : * synchronization is done. This is to avoid missing some
1361 : * data as explained in comments atop worker.c.
1362 : */
1363 6 : update_two_phase = !opts.twophase;
1364 :
1365 6 : CheckAlterSubOption(sub, "two_phase", update_two_phase,
1366 : isTopLevel);
1367 :
1368 : /*
1369 : * Modifying the two_phase slot option requires a slot
1370 : * lookup by slot name, so changing the slot name at the
1371 : * same time is not allowed.
1372 : */
1373 6 : if (update_two_phase &&
1374 2 : IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1375 0 : ereport(ERROR,
1376 : (errcode(ERRCODE_SYNTAX_ERROR),
1377 : errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1378 :
1379 : /*
1380 : * Note that workers may still survive even if the
1381 : * subscription has been disabled.
1382 : *
1383 : * Ensure workers have already been exited to avoid
1384 : * getting prepared transactions while we are disabling
1385 : * the two_phase option. Otherwise, the changes of an
1386 : * already prepared transaction can be replicated again
1387 : * along with its corresponding commit, leading to
1388 : * duplicate data or errors.
1389 : */
1390 6 : if (logicalrep_workers_find(subid, true, true))
1391 0 : ereport(ERROR,
1392 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1393 : errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1394 : errhint("Try again after some time.")));
1395 :
1396 : /*
1397 : * two_phase cannot be disabled if there are any
1398 : * uncommitted prepared transactions present otherwise it
1399 : * can lead to duplicate data or errors as explained in
1400 : * the comment above.
1401 : */
1402 6 : if (update_two_phase &&
1403 2 : sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
1404 2 : LookupGXactBySubid(subid))
1405 0 : ereport(ERROR,
1406 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1407 : errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1408 : errhint("Resolve these transactions and try again.")));
1409 :
1410 : /* Change system catalog accordingly */
1411 6 : values[Anum_pg_subscription_subtwophasestate - 1] =
1412 6 : CharGetDatum(opts.twophase ?
1413 : LOGICALREP_TWOPHASE_STATE_PENDING :
1414 : LOGICALREP_TWOPHASE_STATE_DISABLED);
1415 6 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1416 : }
1417 :
1418 200 : if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1419 : {
1420 : /*
1421 : * Similar to the two_phase case above, we need to update
1422 : * the failover option for both the slot and the
1423 : * subscription.
1424 : */
1425 16 : update_failover = true;
1426 :
1427 16 : CheckAlterSubOption(sub, "failover", update_failover,
1428 : isTopLevel);
1429 :
1430 8 : values[Anum_pg_subscription_subfailover - 1] =
1431 8 : BoolGetDatum(opts.failover);
1432 8 : replaces[Anum_pg_subscription_subfailover - 1] = true;
1433 : }
1434 :
1435 192 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1436 : {
1437 4 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
1438 4 : BoolGetDatum(opts.retaindeadtuples);
1439 4 : replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
1440 :
1441 : /*
1442 : * Update the retention status only if there's a change in
1443 : * the retain_dead_tuples option value.
1444 : *
1445 : * Automatically marking retention as active when
1446 : * retain_dead_tuples is enabled may not always be ideal,
1447 : * especially if retention was previously stopped and the
1448 : * user toggles retain_dead_tuples without adjusting the
1449 : * publisher workload. However, this behavior provides a
1450 : * convenient way for users to manually refresh the
1451 : * retention status. Since retention will be stopped again
1452 : * unless the publisher workload is reduced, this approach
1453 : * is acceptable for now.
1454 : */
1455 4 : if (opts.retaindeadtuples != sub->retaindeadtuples)
1456 : {
1457 4 : values[Anum_pg_subscription_subretentionactive - 1] =
1458 4 : BoolGetDatum(opts.retaindeadtuples);
1459 4 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
1460 :
1461 4 : retention_active = opts.retaindeadtuples;
1462 : }
1463 :
1464 4 : CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1465 :
1466 : /*
1467 : * Workers may continue running even after the
1468 : * subscription has been disabled.
1469 : *
1470 : * To prevent race conditions (as described in
1471 : * CheckAlterSubOption()), ensure that all worker
1472 : * processes have already exited before proceeding.
1473 : */
1474 2 : if (logicalrep_workers_find(subid, true, true))
1475 0 : ereport(ERROR,
1476 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1477 : errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1478 : errhint("Try again after some time.")));
1479 :
1480 : /*
1481 : * Notify the launcher to manage the replication slot for
1482 : * conflict detection. This ensures that replication slot
1483 : * is efficiently handled (created, updated, or dropped)
1484 : * in response to any configuration changes.
1485 : */
1486 2 : ApplyLauncherWakeupAtCommit();
1487 :
1488 2 : check_pub_rdt = opts.retaindeadtuples;
1489 2 : retain_dead_tuples = opts.retaindeadtuples;
1490 : }
1491 :
1492 190 : if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1493 : {
1494 10 : values[Anum_pg_subscription_submaxretention - 1] =
1495 10 : Int32GetDatum(opts.maxretention);
1496 10 : replaces[Anum_pg_subscription_submaxretention - 1] = true;
1497 :
1498 10 : max_retention = opts.maxretention;
1499 : }
1500 :
1501 : /*
1502 : * Ensure that system configuration paramters are set
1503 : * appropriately to support retain_dead_tuples and
1504 : * max_retention_duration.
1505 : */
1506 190 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1507 188 : IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1508 12 : CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
1509 : retain_dead_tuples,
1510 : retention_active,
1511 12 : (max_retention > 0));
1512 :
1513 190 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1514 : {
1515 10 : values[Anum_pg_subscription_suborigin - 1] =
1516 10 : CStringGetTextDatum(opts.origin);
1517 10 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1518 :
1519 : /*
1520 : * Check if changes from different origins may be received
1521 : * from the publisher when the origin is changed to ANY
1522 : * and retain_dead_tuples is enabled.
1523 : */
1524 14 : check_pub_rdt = retain_dead_tuples &&
1525 4 : pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
1526 :
1527 10 : origin = opts.origin;
1528 : }
1529 :
1530 190 : update_tuple = true;
1531 190 : break;
1532 : }
1533 :
1534 104 : case ALTER_SUBSCRIPTION_ENABLED:
1535 : {
1536 104 : parse_subscription_options(pstate, stmt->options,
1537 : SUBOPT_ENABLED, &opts);
1538 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1539 :
1540 104 : if (!sub->slotname && opts.enabled)
1541 6 : ereport(ERROR,
1542 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1543 : errmsg("cannot enable subscription that does not have a slot name")));
1544 :
1545 : /*
1546 : * Check track_commit_timestamp only when enabling the
1547 : * subscription in case it was disabled after creation. See
1548 : * comments atop CheckSubDeadTupleRetention() for details.
1549 : */
1550 98 : CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1551 98 : WARNING, sub->retaindeadtuples,
1552 98 : sub->retentionactive, false);
1553 :
1554 98 : values[Anum_pg_subscription_subenabled - 1] =
1555 98 : BoolGetDatum(opts.enabled);
1556 98 : replaces[Anum_pg_subscription_subenabled - 1] = true;
1557 :
1558 98 : if (opts.enabled)
1559 54 : ApplyLauncherWakeupAtCommit();
1560 :
1561 98 : update_tuple = true;
1562 :
1563 : /*
1564 : * The subscription might be initially created with
1565 : * connect=false and retain_dead_tuples=true, meaning the
1566 : * remote server's status may not be checked. Ensure this
1567 : * check is conducted now.
1568 : */
1569 98 : check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1570 98 : break;
1571 : }
1572 :
1573 20 : case ALTER_SUBSCRIPTION_CONNECTION:
1574 : /* Load the library providing us libpq calls. */
1575 20 : load_file("libpqwalreceiver", false);
1576 : /* Check the connection info string. */
1577 20 : walrcv_check_conninfo(stmt->conninfo,
1578 : sub->passwordrequired && !sub->ownersuperuser);
1579 :
1580 14 : values[Anum_pg_subscription_subconninfo - 1] =
1581 14 : CStringGetTextDatum(stmt->conninfo);
1582 14 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1583 14 : update_tuple = true;
1584 :
1585 : /*
1586 : * Since the remote server configuration might have changed,
1587 : * perform a check to ensure it permits enabling
1588 : * retain_dead_tuples.
1589 : */
1590 14 : check_pub_rdt = sub->retaindeadtuples;
1591 14 : break;
1592 :
1593 32 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
1594 : {
1595 32 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1596 32 : parse_subscription_options(pstate, stmt->options,
1597 : supported_opts, &opts);
1598 :
1599 32 : values[Anum_pg_subscription_subpublications - 1] =
1600 32 : publicationListToArray(stmt->publication);
1601 32 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1602 :
1603 32 : update_tuple = true;
1604 :
1605 : /* Refresh if user asked us to. */
1606 32 : if (opts.refresh)
1607 : {
1608 26 : if (!sub->enabled)
1609 0 : ereport(ERROR,
1610 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1611 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1612 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1613 :
1614 : /*
1615 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1616 : * not allowed.
1617 : */
1618 26 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1619 0 : ereport(ERROR,
1620 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1621 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1622 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1623 :
1624 26 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1625 :
1626 : /* Make sure refresh sees the new list of publications. */
1627 14 : sub->publications = stmt->publication;
1628 :
1629 14 : AlterSubscription_refresh(sub, opts.copy_data,
1630 : stmt->publication);
1631 : }
1632 :
1633 20 : break;
1634 : }
1635 :
1636 54 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
1637 : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1638 : {
1639 : List *publist;
1640 54 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1641 :
1642 54 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1643 54 : parse_subscription_options(pstate, stmt->options,
1644 : supported_opts, &opts);
1645 :
1646 54 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1647 18 : values[Anum_pg_subscription_subpublications - 1] =
1648 18 : publicationListToArray(publist);
1649 18 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1650 :
1651 18 : update_tuple = true;
1652 :
1653 : /* Refresh if user asked us to. */
1654 18 : if (opts.refresh)
1655 : {
1656 : /* We only need to validate user specified publications. */
1657 6 : List *validate_publications = (isadd) ? stmt->publication : NULL;
1658 :
1659 6 : if (!sub->enabled)
1660 0 : ereport(ERROR,
1661 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1662 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1663 : /* translator: %s is an SQL ALTER command */
1664 : errhint("Use %s instead.",
1665 : isadd ?
1666 : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1667 : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1668 :
1669 : /*
1670 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1671 : * not allowed.
1672 : */
1673 6 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1674 0 : ereport(ERROR,
1675 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1676 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1677 : /* translator: %s is an SQL ALTER command */
1678 : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1679 : isadd ?
1680 : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1681 : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1682 :
1683 6 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1684 :
1685 : /* Refresh the new list of publications. */
1686 6 : sub->publications = publist;
1687 :
1688 6 : AlterSubscription_refresh(sub, opts.copy_data,
1689 : validate_publications);
1690 : }
1691 :
1692 18 : break;
1693 : }
1694 :
1695 62 : case ALTER_SUBSCRIPTION_REFRESH:
1696 : {
1697 62 : if (!sub->enabled)
1698 6 : ereport(ERROR,
1699 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1700 : errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1701 :
1702 56 : parse_subscription_options(pstate, stmt->options,
1703 : SUBOPT_COPY_DATA, &opts);
1704 :
1705 : /*
1706 : * The subscription option "two_phase" requires that
1707 : * replication has passed the initial table synchronization
1708 : * phase before the two_phase becomes properly enabled.
1709 : *
1710 : * But, having reached this two-phase commit "enabled" state
1711 : * we must not allow any subsequent table initialization to
1712 : * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
1713 : * when the user had requested two_phase = on mode.
1714 : *
1715 : * The exception to this restriction is when copy_data =
1716 : * false, because when copy_data is false the tablesync will
1717 : * start already in READY state and will exit directly without
1718 : * doing anything.
1719 : *
1720 : * For more details see comments atop worker.c.
1721 : */
1722 56 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1723 0 : ereport(ERROR,
1724 : (errcode(ERRCODE_SYNTAX_ERROR),
1725 : errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1726 : errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1727 :
1728 56 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
1729 :
1730 50 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
1731 :
1732 48 : break;
1733 : }
1734 :
1735 24 : case ALTER_SUBSCRIPTION_SKIP:
1736 : {
1737 24 : parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1738 :
1739 : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1740 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1741 :
1742 : /*
1743 : * If the user sets subskiplsn, we do a sanity check to make
1744 : * sure that the specified LSN is a probable value.
1745 : */
1746 18 : if (!XLogRecPtrIsInvalid(opts.lsn))
1747 : {
1748 : RepOriginId originid;
1749 : char originname[NAMEDATALEN];
1750 : XLogRecPtr remote_lsn;
1751 :
1752 12 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
1753 : originname, sizeof(originname));
1754 12 : originid = replorigin_by_name(originname, false);
1755 12 : remote_lsn = replorigin_get_progress(originid, false);
1756 :
1757 : /* Check the given LSN is at least a future LSN */
1758 12 : if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1759 0 : ereport(ERROR,
1760 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1761 : errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
1762 : LSN_FORMAT_ARGS(opts.lsn),
1763 : LSN_FORMAT_ARGS(remote_lsn))));
1764 : }
1765 :
1766 18 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1767 18 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1768 :
1769 18 : update_tuple = true;
1770 18 : break;
1771 : }
1772 :
1773 0 : default:
1774 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1775 : stmt->kind);
1776 : }
1777 :
1778 : /* Update the catalog if needed. */
1779 406 : if (update_tuple)
1780 : {
1781 358 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1782 : replaces);
1783 :
1784 358 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1785 :
1786 358 : heap_freetuple(tup);
1787 : }
1788 :
1789 : /*
1790 : * Try to acquire the connection necessary either for modifying the slot
1791 : * or for checking if the remote server permits enabling
1792 : * retain_dead_tuples.
1793 : *
1794 : * This has to be at the end because otherwise if there is an error while
1795 : * doing the database operations we won't be able to rollback altered
1796 : * slot.
1797 : */
1798 406 : if (update_failover || update_two_phase || check_pub_rdt)
1799 : {
1800 : bool must_use_password;
1801 : char *err;
1802 : WalReceiverConn *wrconn;
1803 :
1804 : /* Load the library providing us libpq calls. */
1805 26 : load_file("libpqwalreceiver", false);
1806 :
1807 : /*
1808 : * Try to connect to the publisher, using the new connection string if
1809 : * available.
1810 : */
1811 26 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1812 26 : wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
1813 : true, true, must_use_password, sub->name,
1814 : &err);
1815 26 : if (!wrconn)
1816 0 : ereport(ERROR,
1817 : (errcode(ERRCODE_CONNECTION_FAILURE),
1818 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1819 : sub->name, err)));
1820 :
1821 26 : PG_TRY();
1822 : {
1823 26 : if (retain_dead_tuples)
1824 18 : check_pub_dead_tuple_retention(wrconn);
1825 :
1826 26 : check_publications_origin(wrconn, sub->publications, false,
1827 : retain_dead_tuples, origin, NULL, 0,
1828 : sub->name);
1829 :
1830 26 : if (update_failover || update_two_phase)
1831 10 : walrcv_alter_slot(wrconn, sub->slotname,
1832 : update_failover ? &opts.failover : NULL,
1833 : update_two_phase ? &opts.twophase : NULL);
1834 : }
1835 0 : PG_FINALLY();
1836 : {
1837 26 : walrcv_disconnect(wrconn);
1838 : }
1839 26 : PG_END_TRY();
1840 : }
1841 :
1842 406 : table_close(rel, RowExclusiveLock);
1843 :
1844 406 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1845 :
1846 406 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1847 :
1848 : /* Wake up related replication workers to handle this change quickly. */
1849 406 : LogicalRepWorkersWakeupAtCommit(subid);
1850 :
1851 406 : return myself;
1852 : }
1853 :
1854 : /*
1855 : * Drop a subscription
1856 : */
1857 : void
1858 246 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
1859 : {
1860 : Relation rel;
1861 : ObjectAddress myself;
1862 : HeapTuple tup;
1863 : Oid subid;
1864 : Oid subowner;
1865 : Datum datum;
1866 : bool isnull;
1867 : char *subname;
1868 : char *conninfo;
1869 : char *slotname;
1870 : List *subworkers;
1871 : ListCell *lc;
1872 : char originname[NAMEDATALEN];
1873 246 : char *err = NULL;
1874 : WalReceiverConn *wrconn;
1875 : Form_pg_subscription form;
1876 : List *rstates;
1877 : bool must_use_password;
1878 :
1879 : /*
1880 : * The launcher may concurrently start a new worker for this subscription.
1881 : * During initialization, the worker checks for subscription validity and
1882 : * exits if the subscription has already been dropped. See
1883 : * InitializeLogRepWorker.
1884 : */
1885 246 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1886 :
1887 246 : tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
1888 246 : CStringGetDatum(stmt->subname));
1889 :
1890 246 : if (!HeapTupleIsValid(tup))
1891 : {
1892 12 : table_close(rel, NoLock);
1893 :
1894 12 : if (!stmt->missing_ok)
1895 6 : ereport(ERROR,
1896 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1897 : errmsg("subscription \"%s\" does not exist",
1898 : stmt->subname)));
1899 : else
1900 6 : ereport(NOTICE,
1901 : (errmsg("subscription \"%s\" does not exist, skipping",
1902 : stmt->subname)));
1903 :
1904 90 : return;
1905 : }
1906 :
1907 234 : form = (Form_pg_subscription) GETSTRUCT(tup);
1908 234 : subid = form->oid;
1909 234 : subowner = form->subowner;
1910 234 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
1911 :
1912 : /* must be owner */
1913 234 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1914 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1915 0 : stmt->subname);
1916 :
1917 : /* DROP hook for the subscription being removed */
1918 234 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
1919 :
1920 : /*
1921 : * Lock the subscription so nobody else can do anything with it (including
1922 : * the replication workers).
1923 : */
1924 234 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1925 :
1926 : /* Get subname */
1927 234 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1928 : Anum_pg_subscription_subname);
1929 234 : subname = pstrdup(NameStr(*DatumGetName(datum)));
1930 :
1931 : /* Get conninfo */
1932 234 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1933 : Anum_pg_subscription_subconninfo);
1934 234 : conninfo = TextDatumGetCString(datum);
1935 :
1936 : /* Get slotname */
1937 234 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1938 : Anum_pg_subscription_subslotname, &isnull);
1939 234 : if (!isnull)
1940 148 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
1941 : else
1942 86 : slotname = NULL;
1943 :
1944 : /*
1945 : * Since dropping a replication slot is not transactional, the replication
1946 : * slot stays dropped even if the transaction rolls back. So we cannot
1947 : * run DROP SUBSCRIPTION inside a transaction block if dropping the
1948 : * replication slot. Also, in this case, we report a message for dropping
1949 : * the subscription to the cumulative stats system.
1950 : *
1951 : * XXX The command name should really be something like "DROP SUBSCRIPTION
1952 : * of a subscription that is associated with a replication slot", but we
1953 : * don't have the proper facilities for that.
1954 : */
1955 234 : if (slotname)
1956 148 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1957 :
1958 228 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1959 228 : EventTriggerSQLDropAddObject(&myself, true, true);
1960 :
1961 : /* Remove the tuple from catalog. */
1962 228 : CatalogTupleDelete(rel, &tup->t_self);
1963 :
1964 228 : ReleaseSysCache(tup);
1965 :
1966 : /*
1967 : * Stop all the subscription workers immediately.
1968 : *
1969 : * This is necessary if we are dropping the replication slot, so that the
1970 : * slot becomes accessible.
1971 : *
1972 : * It is also necessary if the subscription is disabled and was disabled
1973 : * in the same transaction. Then the workers haven't seen the disabling
1974 : * yet and will still be running, leading to hangs later when we want to
1975 : * drop the replication origin. If the subscription was disabled before
1976 : * this transaction, then there shouldn't be any workers left, so this
1977 : * won't make a difference.
1978 : *
1979 : * New workers won't be started because we hold an exclusive lock on the
1980 : * subscription till the end of the transaction.
1981 : */
1982 228 : subworkers = logicalrep_workers_find(subid, false, true);
1983 372 : foreach(lc, subworkers)
1984 : {
1985 144 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
1986 :
1987 144 : logicalrep_worker_stop(w->subid, w->relid);
1988 : }
1989 228 : list_free(subworkers);
1990 :
1991 : /*
1992 : * Remove the no-longer-useful entry in the launcher's table of apply
1993 : * worker start times.
1994 : *
1995 : * If this transaction rolls back, the launcher might restart a failed
1996 : * apply worker before wal_retrieve_retry_interval milliseconds have
1997 : * elapsed, but that's pretty harmless.
1998 : */
1999 228 : ApplyLauncherForgetWorkerStartTime(subid);
2000 :
2001 : /*
2002 : * Cleanup of tablesync replication origins.
2003 : *
2004 : * Any READY-state relations would already have dealt with clean-ups.
2005 : *
2006 : * Note that the state can't change because we have already stopped both
2007 : * the apply and tablesync workers and they can't restart because of
2008 : * exclusive lock on the subscription.
2009 : */
2010 228 : rstates = GetSubscriptionRelations(subid, true);
2011 240 : foreach(lc, rstates)
2012 : {
2013 12 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2014 12 : Oid relid = rstate->relid;
2015 :
2016 : /* Only cleanup resources of tablesync workers */
2017 12 : if (!OidIsValid(relid))
2018 0 : continue;
2019 :
2020 : /*
2021 : * Drop the tablesync's origin tracking if exists.
2022 : *
2023 : * It is possible that the origin is not yet created for tablesync
2024 : * worker so passing missing_ok = true. This can happen for the states
2025 : * before SUBREL_STATE_FINISHEDCOPY.
2026 : */
2027 12 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
2028 : sizeof(originname));
2029 12 : replorigin_drop_by_name(originname, true, false);
2030 : }
2031 :
2032 : /* Clean up dependencies */
2033 228 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
2034 :
2035 : /* Remove any associated relation synchronization states. */
2036 228 : RemoveSubscriptionRel(subid, InvalidOid);
2037 :
2038 : /* Remove the origin tracking if exists. */
2039 228 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
2040 228 : replorigin_drop_by_name(originname, true, false);
2041 :
2042 : /*
2043 : * Tell the cumulative stats system that the subscription is getting
2044 : * dropped.
2045 : */
2046 228 : pgstat_drop_subscription(subid);
2047 :
2048 : /*
2049 : * If there is no slot associated with the subscription, we can finish
2050 : * here.
2051 : */
2052 228 : if (!slotname && rstates == NIL)
2053 : {
2054 84 : table_close(rel, NoLock);
2055 84 : return;
2056 : }
2057 :
2058 : /*
2059 : * Try to acquire the connection necessary for dropping slots.
2060 : *
2061 : * Note: If the slotname is NONE/NULL then we allow the command to finish
2062 : * and users need to manually cleanup the apply and tablesync worker slots
2063 : * later.
2064 : *
2065 : * This has to be at the end because otherwise if there is an error while
2066 : * doing the database operations we won't be able to rollback dropped
2067 : * slot.
2068 : */
2069 144 : load_file("libpqwalreceiver", false);
2070 :
2071 144 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2072 : subname, &err);
2073 144 : if (wrconn == NULL)
2074 : {
2075 0 : if (!slotname)
2076 : {
2077 : /* be tidy */
2078 0 : list_free(rstates);
2079 0 : table_close(rel, NoLock);
2080 0 : return;
2081 : }
2082 : else
2083 : {
2084 0 : ReportSlotConnectionError(rstates, subid, slotname, err);
2085 : }
2086 : }
2087 :
2088 144 : PG_TRY();
2089 : {
2090 156 : foreach(lc, rstates)
2091 : {
2092 12 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2093 12 : Oid relid = rstate->relid;
2094 :
2095 : /* Only cleanup resources of tablesync workers */
2096 12 : if (!OidIsValid(relid))
2097 0 : continue;
2098 :
2099 : /*
2100 : * Drop the tablesync slots associated with removed tables.
2101 : *
2102 : * For SYNCDONE/READY states, the tablesync slot is known to have
2103 : * already been dropped by the tablesync worker.
2104 : *
2105 : * For other states, there is no certainty, maybe the slot does
2106 : * not exist yet. Also, if we fail after removing some of the
2107 : * slots, next time, it will again try to drop already dropped
2108 : * slots and fail. For these reasons, we allow missing_ok = true
2109 : * for the drop.
2110 : */
2111 12 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2112 : {
2113 10 : char syncslotname[NAMEDATALEN] = {0};
2114 :
2115 10 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2116 : sizeof(syncslotname));
2117 10 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
2118 : }
2119 : }
2120 :
2121 144 : list_free(rstates);
2122 :
2123 : /*
2124 : * If there is a slot associated with the subscription, then drop the
2125 : * replication slot at the publisher.
2126 : */
2127 144 : if (slotname)
2128 142 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2129 : }
2130 2 : PG_FINALLY();
2131 : {
2132 144 : walrcv_disconnect(wrconn);
2133 : }
2134 144 : PG_END_TRY();
2135 :
2136 142 : table_close(rel, NoLock);
2137 : }
2138 :
2139 : /*
2140 : * Drop the replication slot at the publisher node using the replication
2141 : * connection.
2142 : *
2143 : * missing_ok - if true then only issue a LOG message if the slot doesn't
2144 : * exist.
2145 : */
2146 : void
2147 530 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2148 : {
2149 : StringInfoData cmd;
2150 :
2151 : Assert(wrconn);
2152 :
2153 530 : load_file("libpqwalreceiver", false);
2154 :
2155 530 : initStringInfo(&cmd);
2156 530 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2157 :
2158 530 : PG_TRY();
2159 : {
2160 : WalRcvExecResult *res;
2161 :
2162 530 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2163 :
2164 530 : if (res->status == WALRCV_OK_COMMAND)
2165 : {
2166 : /* NOTICE. Success. */
2167 524 : ereport(NOTICE,
2168 : (errmsg("dropped replication slot \"%s\" on publisher",
2169 : slotname)));
2170 : }
2171 6 : else if (res->status == WALRCV_ERROR &&
2172 4 : missing_ok &&
2173 4 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
2174 : {
2175 : /* LOG. Error, but missing_ok = true. */
2176 4 : ereport(LOG,
2177 : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2178 : slotname, res->err)));
2179 : }
2180 : else
2181 : {
2182 : /* ERROR. */
2183 2 : ereport(ERROR,
2184 : (errcode(ERRCODE_CONNECTION_FAILURE),
2185 : errmsg("could not drop replication slot \"%s\" on publisher: %s",
2186 : slotname, res->err)));
2187 : }
2188 :
2189 528 : walrcv_clear_result(res);
2190 : }
2191 2 : PG_FINALLY();
2192 : {
2193 530 : pfree(cmd.data);
2194 : }
2195 530 : PG_END_TRY();
2196 528 : }
2197 :
2198 : /*
2199 : * Internal workhorse for changing a subscription owner
2200 : */
2201 : static void
2202 18 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2203 : {
2204 : Form_pg_subscription form;
2205 : AclResult aclresult;
2206 :
2207 18 : form = (Form_pg_subscription) GETSTRUCT(tup);
2208 :
2209 18 : if (form->subowner == newOwnerId)
2210 4 : return;
2211 :
2212 14 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
2213 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
2214 0 : NameStr(form->subname));
2215 :
2216 : /*
2217 : * Don't allow non-superuser modification of a subscription with
2218 : * password_required=false.
2219 : */
2220 14 : if (!form->subpasswordrequired && !superuser())
2221 0 : ereport(ERROR,
2222 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2223 : errmsg("password_required=false is superuser-only"),
2224 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2225 :
2226 : /* Must be able to become new owner */
2227 14 : check_can_set_role(GetUserId(), newOwnerId);
2228 :
2229 : /*
2230 : * current owner must have CREATE on database
2231 : *
2232 : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2233 : * other object types behave differently (e.g. you can't give a table to a
2234 : * user who lacks CREATE privileges on a schema).
2235 : */
2236 8 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
2237 : GetUserId(), ACL_CREATE);
2238 8 : if (aclresult != ACLCHECK_OK)
2239 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2240 0 : get_database_name(MyDatabaseId));
2241 :
2242 8 : form->subowner = newOwnerId;
2243 8 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2244 :
2245 : /* Update owner dependency reference */
2246 8 : changeDependencyOnOwner(SubscriptionRelationId,
2247 : form->oid,
2248 : newOwnerId);
2249 :
2250 8 : InvokeObjectPostAlterHook(SubscriptionRelationId,
2251 : form->oid, 0);
2252 :
2253 : /* Wake up related background processes to handle this change quickly. */
2254 8 : ApplyLauncherWakeupAtCommit();
2255 8 : LogicalRepWorkersWakeupAtCommit(form->oid);
2256 : }
2257 :
2258 : /*
2259 : * Change subscription owner -- by name
2260 : */
2261 : ObjectAddress
2262 18 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
2263 : {
2264 : Oid subid;
2265 : HeapTuple tup;
2266 : Relation rel;
2267 : ObjectAddress address;
2268 : Form_pg_subscription form;
2269 :
2270 18 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2271 :
2272 18 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2273 : CStringGetDatum(name));
2274 :
2275 18 : if (!HeapTupleIsValid(tup))
2276 0 : ereport(ERROR,
2277 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2278 : errmsg("subscription \"%s\" does not exist", name)));
2279 :
2280 18 : form = (Form_pg_subscription) GETSTRUCT(tup);
2281 18 : subid = form->oid;
2282 :
2283 18 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2284 :
2285 12 : ObjectAddressSet(address, SubscriptionRelationId, subid);
2286 :
2287 12 : heap_freetuple(tup);
2288 :
2289 12 : table_close(rel, RowExclusiveLock);
2290 :
2291 12 : return address;
2292 : }
2293 :
2294 : /*
2295 : * Change subscription owner -- by OID
2296 : */
2297 : void
2298 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
2299 : {
2300 : HeapTuple tup;
2301 : Relation rel;
2302 :
2303 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2304 :
2305 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
2306 :
2307 0 : if (!HeapTupleIsValid(tup))
2308 0 : ereport(ERROR,
2309 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2310 : errmsg("subscription with OID %u does not exist", subid)));
2311 :
2312 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2313 :
2314 0 : heap_freetuple(tup);
2315 :
2316 0 : table_close(rel, RowExclusiveLock);
2317 0 : }
2318 :
2319 : /*
2320 : * Check and log a warning if the publisher has subscribed to the same table,
2321 : * its partition ancestors (if it's a partition), or its partition children (if
2322 : * it's a partitioned table), from some other publishers. This check is
2323 : * required in the following scenarios:
2324 : *
2325 : * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
2326 : * with "copy_data = true" and "origin = none":
2327 : * - Warn the user that data with an origin might have been copied.
2328 : * - This check is skipped for tables already added, as incremental sync via
2329 : * WAL allows origin tracking. The list of such tables is in
2330 : * subrel_local_oids.
2331 : *
2332 : * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
2333 : * with "retain_dead_tuples = true" and "origin = any", and for ALTER
2334 : * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or
2335 : * when the publisher's status changes (e.g., due to a connection string
2336 : * update):
2337 : * - Warn the user that only conflict detection info for local changes on
2338 : * the publisher is retained. Data from other origins may lack sufficient
2339 : * details for reliable conflict detection.
2340 : * - See comments atop worker.c for more details.
2341 : */
2342 : static void
2343 330 : check_publications_origin(WalReceiverConn *wrconn, List *publications,
2344 : bool copydata, bool retain_dead_tuples,
2345 : char *origin, Oid *subrel_local_oids,
2346 : int subrel_count, char *subname)
2347 : {
2348 : WalRcvExecResult *res;
2349 : StringInfoData cmd;
2350 : TupleTableSlot *slot;
2351 330 : Oid tableRow[1] = {TEXTOID};
2352 330 : List *publist = NIL;
2353 : int i;
2354 : bool check_rdt;
2355 : bool check_table_sync;
2356 660 : bool origin_none = origin &&
2357 330 : pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
2358 :
2359 : /*
2360 : * Enable retain_dead_tuples checks only when origin is set to 'any',
2361 : * since with origin='none' only local changes are replicated to the
2362 : * subscriber.
2363 : */
2364 330 : check_rdt = retain_dead_tuples && !origin_none;
2365 :
2366 : /*
2367 : * Enable table synchronization checks only when origin is 'none', to
2368 : * ensure that data from other origins is not inadvertently copied.
2369 : */
2370 330 : check_table_sync = copydata && origin_none;
2371 :
2372 : /* retain_dead_tuples and table sync checks occur separately */
2373 : Assert(!(check_rdt && check_table_sync));
2374 :
2375 : /* Return if no checks are required */
2376 330 : if (!check_rdt && !check_table_sync)
2377 302 : return;
2378 :
2379 28 : initStringInfo(&cmd);
2380 28 : appendStringInfoString(&cmd,
2381 : "SELECT DISTINCT P.pubname AS pubname\n"
2382 : "FROM pg_publication P,\n"
2383 : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2384 : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2385 : " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2386 : " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2387 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2388 : "WHERE C.oid = GPT.relid AND P.pubname IN (");
2389 28 : GetPublicationsStr(publications, &cmd, true);
2390 28 : appendStringInfoString(&cmd, ")\n");
2391 :
2392 : /*
2393 : * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
2394 : * the list of relation oids that are already present on the subscriber.
2395 : * This check should be skipped for these tables if checking for table
2396 : * sync scenario. However, when handling the retain_dead_tuples scenario,
2397 : * ensure all tables are checked, as some existing tables may now include
2398 : * changes from other origins due to newly created subscriptions on the
2399 : * publisher.
2400 : */
2401 28 : if (check_table_sync)
2402 : {
2403 28 : for (i = 0; i < subrel_count; i++)
2404 : {
2405 8 : Oid relid = subrel_local_oids[i];
2406 8 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
2407 8 : char *tablename = get_rel_name(relid);
2408 :
2409 8 : appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2410 : schemaname, tablename);
2411 : }
2412 : }
2413 :
2414 28 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2415 28 : pfree(cmd.data);
2416 :
2417 28 : if (res->status != WALRCV_OK_TUPLES)
2418 0 : ereport(ERROR,
2419 : (errcode(ERRCODE_CONNECTION_FAILURE),
2420 : errmsg("could not receive list of replicated tables from the publisher: %s",
2421 : res->err)));
2422 :
2423 : /* Process tables. */
2424 28 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2425 38 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2426 : {
2427 : char *pubname;
2428 : bool isnull;
2429 :
2430 10 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2431 : Assert(!isnull);
2432 :
2433 10 : ExecClearTuple(slot);
2434 10 : publist = list_append_unique(publist, makeString(pubname));
2435 : }
2436 :
2437 : /*
2438 : * Log a warning if the publisher has subscribed to the same table from
2439 : * some other publisher. We cannot know the origin of data during the
2440 : * initial sync. Data origins can be found only from the WAL by looking at
2441 : * the origin id.
2442 : *
2443 : * XXX: For simplicity, we don't check whether the table has any data or
2444 : * not. If the table doesn't have any data then we don't need to
2445 : * distinguish between data having origin and data not having origin so we
2446 : * can avoid logging a warning for table sync scenario.
2447 : */
2448 28 : if (publist)
2449 : {
2450 10 : StringInfo pubnames = makeStringInfo();
2451 10 : StringInfo err_msg = makeStringInfo();
2452 10 : StringInfo err_hint = makeStringInfo();
2453 :
2454 : /* Prepare the list of publication(s) for warning message. */
2455 10 : GetPublicationsStr(publist, pubnames, false);
2456 :
2457 10 : if (check_table_sync)
2458 : {
2459 8 : appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
2460 : subname);
2461 8 : appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
2462 : }
2463 : else
2464 : {
2465 2 : appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
2466 : subname);
2467 2 : appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
2468 : }
2469 :
2470 10 : ereport(WARNING,
2471 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2472 : errmsg_internal("%s", err_msg->data),
2473 : errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2474 : "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2475 : list_length(publist), pubnames->data),
2476 : errhint_internal("%s", err_hint->data));
2477 : }
2478 :
2479 28 : ExecDropSingleTupleTableSlot(slot);
2480 :
2481 28 : walrcv_clear_result(res);
2482 : }
2483 :
2484 : /*
2485 : * Determine whether the retain_dead_tuples can be enabled based on the
2486 : * publisher's status.
2487 : *
2488 : * This option is disallowed if the publisher is running a version earlier
2489 : * than the PG19, or if the publisher is in recovery (i.e., it is a standby
2490 : * server).
2491 : *
2492 : * See comments atop worker.c for a detailed explanation.
2493 : */
2494 : static void
2495 24 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
2496 : {
2497 : WalRcvExecResult *res;
2498 24 : Oid RecoveryRow[1] = {BOOLOID};
2499 : TupleTableSlot *slot;
2500 : bool isnull;
2501 : bool remote_in_recovery;
2502 :
2503 24 : if (walrcv_server_version(wrconn) < 19000)
2504 0 : ereport(ERROR,
2505 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2506 : errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
2507 :
2508 24 : res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
2509 :
2510 24 : if (res->status != WALRCV_OK_TUPLES)
2511 0 : ereport(ERROR,
2512 : (errcode(ERRCODE_CONNECTION_FAILURE),
2513 : errmsg("could not obtain recovery progress from the publisher: %s",
2514 : res->err)));
2515 :
2516 24 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2517 24 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2518 0 : elog(ERROR, "failed to fetch tuple for the recovery progress");
2519 :
2520 24 : remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
2521 :
2522 24 : if (remote_in_recovery)
2523 0 : ereport(ERROR,
2524 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2525 : errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
2526 :
2527 24 : ExecDropSingleTupleTableSlot(slot);
2528 :
2529 24 : walrcv_clear_result(res);
2530 24 : }
2531 :
2532 : /*
2533 : * Check if the subscriber's configuration is adequate to enable the
2534 : * retain_dead_tuples option.
2535 : *
2536 : * Issue an ERROR if the wal_level does not support the use of replication
2537 : * slots when check_guc is set to true.
2538 : *
2539 : * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
2540 : * set to true. This is only to highlight the importance of enabling
2541 : * track_commit_timestamp instead of catching all the misconfigurations, as
2542 : * this setting can be adjusted after subscription creation. Without it, the
2543 : * apply worker will simply skip conflict detection.
2544 : *
2545 : * Issue a WARNING or NOTICE if the subscription is disabled and the retention
2546 : * is active. Do not raise an ERROR since users can only modify
2547 : * retain_dead_tuples for disabled subscriptions. And as long as the
2548 : * subscription is enabled promptly, it will not pose issues.
2549 : *
2550 : * Issue a NOTICE to inform users that max_retention_duration is
2551 : * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
2552 : * is not issued because setting max_retention_duration causes no harm,
2553 : * even when it is ineffective.
2554 : */
2555 : void
2556 478 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
2557 : int elevel_for_sub_disabled,
2558 : bool retain_dead_tuples, bool retention_active,
2559 : bool max_retention_set)
2560 : {
2561 : Assert(elevel_for_sub_disabled == NOTICE ||
2562 : elevel_for_sub_disabled == WARNING);
2563 :
2564 478 : if (retain_dead_tuples)
2565 : {
2566 34 : if (check_guc && wal_level < WAL_LEVEL_REPLICA)
2567 0 : ereport(ERROR,
2568 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2569 : errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
2570 : errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
2571 :
2572 34 : if (check_guc && !track_commit_timestamp)
2573 8 : ereport(WARNING,
2574 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2575 : errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
2576 : errhint("Consider setting \"%s\" to true.",
2577 : "track_commit_timestamp"));
2578 :
2579 34 : if (sub_disabled && retention_active)
2580 14 : ereport(elevel_for_sub_disabled,
2581 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2582 : errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
2583 : (elevel_for_sub_disabled > NOTICE)
2584 : ? errhint("Consider setting %s to false.",
2585 : "retain_dead_tuples") : 0);
2586 : }
2587 444 : else if (max_retention_set)
2588 : {
2589 6 : ereport(NOTICE,
2590 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2591 : errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
2592 : }
2593 478 : }
2594 :
2595 : /*
2596 : * Get the list of tables which belong to specified publications on the
2597 : * publisher connection.
2598 : *
2599 : * Note that we don't support the case where the column list is different for
2600 : * the same table in different publications to avoid sending unwanted column
2601 : * information for some of the rows. This can happen when both the column
2602 : * list and row filter are specified for different publications.
2603 : */
2604 : static List *
2605 304 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
2606 : {
2607 : WalRcvExecResult *res;
2608 : StringInfoData cmd;
2609 : TupleTableSlot *slot;
2610 304 : Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
2611 304 : List *tablelist = NIL;
2612 304 : int server_version = walrcv_server_version(wrconn);
2613 304 : bool check_columnlist = (server_version >= 150000);
2614 304 : StringInfo pub_names = makeStringInfo();
2615 :
2616 304 : initStringInfo(&cmd);
2617 :
2618 : /* Build the pub_names comma-separated string. */
2619 304 : GetPublicationsStr(publications, pub_names, true);
2620 :
2621 : /* Get the list of tables from the publisher. */
2622 304 : if (server_version >= 160000)
2623 : {
2624 304 : tableRow[2] = INT2VECTOROID;
2625 :
2626 : /*
2627 : * From version 16, we allowed passing multiple publications to the
2628 : * function pg_get_publication_tables. This helped to filter out the
2629 : * partition table whose ancestor is also published in this
2630 : * publication array.
2631 : *
2632 : * Join pg_get_publication_tables with pg_publication to exclude
2633 : * non-existing publications.
2634 : *
2635 : * Note that attrs are always stored in sorted order so we don't need
2636 : * to worry if different publications have specified them in a
2637 : * different order. See pub_collist_validate.
2638 : */
2639 304 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
2640 : " FROM pg_class c\n"
2641 : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2642 : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2643 : " FROM pg_publication\n"
2644 : " WHERE pubname IN ( %s )) AS gpt\n"
2645 : " ON gpt.relid = c.oid\n",
2646 : pub_names->data);
2647 : }
2648 : else
2649 : {
2650 0 : tableRow[2] = NAMEARRAYOID;
2651 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
2652 :
2653 : /* Get column lists for each relation if the publisher supports it */
2654 0 : if (check_columnlist)
2655 0 : appendStringInfoString(&cmd, ", t.attnames\n");
2656 :
2657 0 : appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2658 : " WHERE t.pubname IN ( %s )",
2659 : pub_names->data);
2660 : }
2661 :
2662 304 : destroyStringInfo(pub_names);
2663 :
2664 304 : res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
2665 304 : pfree(cmd.data);
2666 :
2667 304 : if (res->status != WALRCV_OK_TUPLES)
2668 0 : ereport(ERROR,
2669 : (errcode(ERRCODE_CONNECTION_FAILURE),
2670 : errmsg("could not receive list of replicated tables from the publisher: %s",
2671 : res->err)));
2672 :
2673 : /* Process tables. */
2674 304 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2675 844 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2676 : {
2677 : char *nspname;
2678 : char *relname;
2679 : bool isnull;
2680 : RangeVar *rv;
2681 :
2682 542 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2683 : Assert(!isnull);
2684 542 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2685 : Assert(!isnull);
2686 :
2687 542 : rv = makeRangeVar(nspname, relname, -1);
2688 :
2689 542 : if (check_columnlist && list_member(tablelist, rv))
2690 2 : ereport(ERROR,
2691 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2692 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2693 : nspname, relname));
2694 : else
2695 540 : tablelist = lappend(tablelist, rv);
2696 :
2697 540 : ExecClearTuple(slot);
2698 : }
2699 302 : ExecDropSingleTupleTableSlot(slot);
2700 :
2701 302 : walrcv_clear_result(res);
2702 :
2703 302 : return tablelist;
2704 : }
2705 :
2706 : /*
2707 : * This is to report the connection failure while dropping replication slots.
2708 : * Here, we report the WARNING for all tablesync slots so that user can drop
2709 : * them manually, if required.
2710 : */
2711 : static void
2712 0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
2713 : {
2714 : ListCell *lc;
2715 :
2716 0 : foreach(lc, rstates)
2717 : {
2718 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2719 0 : Oid relid = rstate->relid;
2720 :
2721 : /* Only cleanup resources of tablesync workers */
2722 0 : if (!OidIsValid(relid))
2723 0 : continue;
2724 :
2725 : /*
2726 : * Caller needs to ensure that relstate doesn't change underneath us.
2727 : * See DropSubscription where we get the relstates.
2728 : */
2729 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2730 : {
2731 0 : char syncslotname[NAMEDATALEN] = {0};
2732 :
2733 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2734 : sizeof(syncslotname));
2735 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
2736 : syncslotname);
2737 : }
2738 : }
2739 :
2740 0 : ereport(ERROR,
2741 : (errcode(ERRCODE_CONNECTION_FAILURE),
2742 : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2743 : slotname, err),
2744 : /* translator: %s is an SQL ALTER command */
2745 : errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
2746 : "ALTER SUBSCRIPTION ... DISABLE",
2747 : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2748 : }
2749 :
2750 : /*
2751 : * Check for duplicates in the given list of publications and error out if
2752 : * found one. Add publications to datums as text datums, if datums is not
2753 : * NULL.
2754 : */
2755 : static void
2756 446 : check_duplicates_in_publist(List *publist, Datum *datums)
2757 : {
2758 : ListCell *cell;
2759 446 : int j = 0;
2760 :
2761 1016 : foreach(cell, publist)
2762 : {
2763 588 : char *name = strVal(lfirst(cell));
2764 : ListCell *pcell;
2765 :
2766 886 : foreach(pcell, publist)
2767 : {
2768 886 : char *pname = strVal(lfirst(pcell));
2769 :
2770 886 : if (pcell == cell)
2771 570 : break;
2772 :
2773 316 : if (strcmp(name, pname) == 0)
2774 18 : ereport(ERROR,
2775 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2776 : errmsg("publication name \"%s\" used more than once",
2777 : pname)));
2778 : }
2779 :
2780 570 : if (datums)
2781 484 : datums[j++] = CStringGetTextDatum(name);
2782 : }
2783 428 : }
2784 :
2785 : /*
2786 : * Merge current subscription's publications and user-specified publications
2787 : * from ADD/DROP PUBLICATIONS.
2788 : *
2789 : * If addpub is true, we will add the list of publications into oldpublist.
2790 : * Otherwise, we will delete the list of publications from oldpublist. The
2791 : * returned list is a copy, oldpublist itself is not changed.
2792 : *
2793 : * subname is the subscription name, for error messages.
2794 : */
2795 : static List *
2796 54 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
2797 : {
2798 : ListCell *lc;
2799 :
2800 54 : oldpublist = list_copy(oldpublist);
2801 :
2802 54 : check_duplicates_in_publist(newpublist, NULL);
2803 :
2804 92 : foreach(lc, newpublist)
2805 : {
2806 68 : char *name = strVal(lfirst(lc));
2807 : ListCell *lc2;
2808 68 : bool found = false;
2809 :
2810 134 : foreach(lc2, oldpublist)
2811 : {
2812 110 : char *pubname = strVal(lfirst(lc2));
2813 :
2814 110 : if (strcmp(name, pubname) == 0)
2815 : {
2816 44 : found = true;
2817 44 : if (addpub)
2818 12 : ereport(ERROR,
2819 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2820 : errmsg("publication \"%s\" is already in subscription \"%s\"",
2821 : name, subname)));
2822 : else
2823 32 : oldpublist = foreach_delete_current(oldpublist, lc2);
2824 :
2825 32 : break;
2826 : }
2827 : }
2828 :
2829 56 : if (addpub && !found)
2830 18 : oldpublist = lappend(oldpublist, makeString(name));
2831 38 : else if (!addpub && !found)
2832 6 : ereport(ERROR,
2833 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2834 : errmsg("publication \"%s\" is not in subscription \"%s\"",
2835 : name, subname)));
2836 : }
2837 :
2838 : /*
2839 : * XXX Probably no strong reason for this, but for now it's to make ALTER
2840 : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
2841 : */
2842 24 : if (!oldpublist)
2843 6 : ereport(ERROR,
2844 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2845 : errmsg("cannot drop all the publications from a subscription")));
2846 :
2847 18 : return oldpublist;
2848 : }
2849 :
2850 : /*
2851 : * Extract the streaming mode value from a DefElem. This is like
2852 : * defGetBoolean() but also accepts the special value of "parallel".
2853 : */
2854 : char
2855 830 : defGetStreamingMode(DefElem *def)
2856 : {
2857 : /*
2858 : * If no parameter value given, assume "true" is meant.
2859 : */
2860 830 : if (!def->arg)
2861 0 : return LOGICALREP_STREAM_ON;
2862 :
2863 : /*
2864 : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
2865 : */
2866 830 : switch (nodeTag(def->arg))
2867 : {
2868 0 : case T_Integer:
2869 0 : switch (intVal(def->arg))
2870 : {
2871 0 : case 0:
2872 0 : return LOGICALREP_STREAM_OFF;
2873 0 : case 1:
2874 0 : return LOGICALREP_STREAM_ON;
2875 0 : default:
2876 : /* otherwise, error out below */
2877 0 : break;
2878 : }
2879 0 : break;
2880 830 : default:
2881 : {
2882 830 : char *sval = defGetString(def);
2883 :
2884 : /*
2885 : * The set of strings accepted here should match up with the
2886 : * grammar's opt_boolean_or_string production.
2887 : */
2888 1654 : if (pg_strcasecmp(sval, "false") == 0 ||
2889 824 : pg_strcasecmp(sval, "off") == 0)
2890 12 : return LOGICALREP_STREAM_OFF;
2891 1618 : if (pg_strcasecmp(sval, "true") == 0 ||
2892 800 : pg_strcasecmp(sval, "on") == 0)
2893 88 : return LOGICALREP_STREAM_ON;
2894 730 : if (pg_strcasecmp(sval, "parallel") == 0)
2895 724 : return LOGICALREP_STREAM_PARALLEL;
2896 : }
2897 6 : break;
2898 : }
2899 :
2900 6 : ereport(ERROR,
2901 : (errcode(ERRCODE_SYNTAX_ERROR),
2902 : errmsg("%s requires a Boolean value or \"parallel\"",
2903 : def->defname)));
2904 : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
2905 : }
|