Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * subscriptioncmds.c
4 : * subscription catalog manipulation functions
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/subscriptioncmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/htup_details.h"
19 : #include "access/table.h"
20 : #include "access/twophase.h"
21 : #include "access/xact.h"
22 : #include "catalog/catalog.h"
23 : #include "catalog/dependency.h"
24 : #include "catalog/indexing.h"
25 : #include "catalog/namespace.h"
26 : #include "catalog/objectaccess.h"
27 : #include "catalog/objectaddress.h"
28 : #include "catalog/pg_authid_d.h"
29 : #include "catalog/pg_database_d.h"
30 : #include "catalog/pg_subscription.h"
31 : #include "catalog/pg_subscription_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/dbcommands.h"
34 : #include "commands/defrem.h"
35 : #include "commands/event_trigger.h"
36 : #include "commands/subscriptioncmds.h"
37 : #include "executor/executor.h"
38 : #include "miscadmin.h"
39 : #include "nodes/makefuncs.h"
40 : #include "pgstat.h"
41 : #include "replication/logicallauncher.h"
42 : #include "replication/logicalworker.h"
43 : #include "replication/origin.h"
44 : #include "replication/slot.h"
45 : #include "replication/walreceiver.h"
46 : #include "replication/walsender.h"
47 : #include "replication/worker_internal.h"
48 : #include "storage/lmgr.h"
49 : #include "utils/acl.h"
50 : #include "utils/builtins.h"
51 : #include "utils/guc.h"
52 : #include "utils/lsyscache.h"
53 : #include "utils/memutils.h"
54 : #include "utils/pg_lsn.h"
55 : #include "utils/syscache.h"
56 :
57 : /*
58 : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
59 : * command.
60 : */
61 : #define SUBOPT_CONNECT 0x00000001
62 : #define SUBOPT_ENABLED 0x00000002
63 : #define SUBOPT_CREATE_SLOT 0x00000004
64 : #define SUBOPT_SLOT_NAME 0x00000008
65 : #define SUBOPT_COPY_DATA 0x00000010
66 : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
67 : #define SUBOPT_REFRESH 0x00000040
68 : #define SUBOPT_BINARY 0x00000080
69 : #define SUBOPT_STREAMING 0x00000100
70 : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
71 : #define SUBOPT_DISABLE_ON_ERR 0x00000400
72 : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
73 : #define SUBOPT_RUN_AS_OWNER 0x00001000
74 : #define SUBOPT_FAILOVER 0x00002000
75 : #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
76 : #define SUBOPT_LSN 0x00008000
77 : #define SUBOPT_ORIGIN 0x00010000
78 :
79 : /* check if the 'val' has 'bits' set */
80 : #define IsSet(val, bits) (((val) & (bits)) == (bits))
81 :
82 : /*
83 : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
84 : * SUBSCRIPTION command options and the parsed/default values of each of them.
85 : */
86 : typedef struct SubOpts
87 : {
88 : bits32 specified_opts;
89 : char *slot_name;
90 : char *synchronous_commit;
91 : bool connect;
92 : bool enabled;
93 : bool create_slot;
94 : bool copy_data;
95 : bool refresh;
96 : bool binary;
97 : char streaming;
98 : bool twophase;
99 : bool disableonerr;
100 : bool passwordrequired;
101 : bool runasowner;
102 : bool failover;
103 : bool retaindeadtuples;
104 : char *origin;
105 : XLogRecPtr lsn;
106 : } SubOpts;
107 :
108 : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
109 : static void check_publications_origin(WalReceiverConn *wrconn,
110 : List *publications, bool copydata,
111 : bool retain_dead_tuples, char *origin,
112 : Oid *subrel_local_oids, int subrel_count,
113 : char *subname);
114 : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
115 : static void check_duplicates_in_publist(List *publist, Datum *datums);
116 : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
117 : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
118 : static void CheckAlterSubOption(Subscription *sub, const char *option,
119 : bool slot_needs_update, bool isTopLevel);
120 :
121 :
122 : /*
123 : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
124 : *
125 : * Since not all options can be specified in both commands, this function
126 : * will report an error if mutually exclusive options are specified.
127 : */
128 : static void
129 920 : parse_subscription_options(ParseState *pstate, List *stmt_options,
130 : bits32 supported_opts, SubOpts *opts)
131 : {
132 : ListCell *lc;
133 :
134 : /* Start out with cleared opts. */
135 920 : memset(opts, 0, sizeof(SubOpts));
136 :
137 : /* caller must expect some option */
138 : Assert(supported_opts != 0);
139 :
140 : /* If connect option is supported, these others also need to be. */
141 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
142 : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
143 : SUBOPT_COPY_DATA));
144 :
145 : /* Set default values for the supported options. */
146 920 : if (IsSet(supported_opts, SUBOPT_CONNECT))
147 466 : opts->connect = true;
148 920 : if (IsSet(supported_opts, SUBOPT_ENABLED))
149 558 : opts->enabled = true;
150 920 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
151 466 : opts->create_slot = true;
152 920 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
153 604 : opts->copy_data = true;
154 920 : if (IsSet(supported_opts, SUBOPT_REFRESH))
155 86 : opts->refresh = true;
156 920 : if (IsSet(supported_opts, SUBOPT_BINARY))
157 666 : opts->binary = false;
158 920 : if (IsSet(supported_opts, SUBOPT_STREAMING))
159 666 : opts->streaming = LOGICALREP_STREAM_PARALLEL;
160 920 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
161 666 : opts->twophase = false;
162 920 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
163 666 : opts->disableonerr = false;
164 920 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
165 666 : opts->passwordrequired = true;
166 920 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
167 666 : opts->runasowner = false;
168 920 : if (IsSet(supported_opts, SUBOPT_FAILOVER))
169 666 : opts->failover = false;
170 920 : if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
171 666 : opts->retaindeadtuples = false;
172 920 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
173 666 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
174 :
175 : /* Parse options */
176 1818 : foreach(lc, stmt_options)
177 : {
178 958 : DefElem *defel = (DefElem *) lfirst(lc);
179 :
180 958 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
181 566 : strcmp(defel->defname, "connect") == 0)
182 : {
183 176 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
184 0 : errorConflictingDefElem(defel, pstate);
185 :
186 176 : opts->specified_opts |= SUBOPT_CONNECT;
187 176 : opts->connect = defGetBoolean(defel);
188 : }
189 782 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
190 482 : strcmp(defel->defname, "enabled") == 0)
191 : {
192 130 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
193 0 : errorConflictingDefElem(defel, pstate);
194 :
195 130 : opts->specified_opts |= SUBOPT_ENABLED;
196 130 : opts->enabled = defGetBoolean(defel);
197 : }
198 652 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
199 352 : strcmp(defel->defname, "create_slot") == 0)
200 : {
201 40 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
202 0 : errorConflictingDefElem(defel, pstate);
203 :
204 40 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
205 40 : opts->create_slot = defGetBoolean(defel);
206 : }
207 612 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
208 516 : strcmp(defel->defname, "slot_name") == 0)
209 : {
210 150 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
211 0 : errorConflictingDefElem(defel, pstate);
212 :
213 150 : opts->specified_opts |= SUBOPT_SLOT_NAME;
214 150 : opts->slot_name = defGetString(defel);
215 :
216 : /* Setting slot_name = NONE is treated as no slot name. */
217 150 : if (strcmp(opts->slot_name, "none") == 0)
218 116 : opts->slot_name = NULL;
219 : else
220 34 : ReplicationSlotValidateName(opts->slot_name, false, ERROR);
221 : }
222 462 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
223 306 : strcmp(defel->defname, "copy_data") == 0)
224 : {
225 54 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
226 0 : errorConflictingDefElem(defel, pstate);
227 :
228 54 : opts->specified_opts |= SUBOPT_COPY_DATA;
229 54 : opts->copy_data = defGetBoolean(defel);
230 : }
231 408 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
232 318 : strcmp(defel->defname, "synchronous_commit") == 0)
233 : {
234 12 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
235 0 : errorConflictingDefElem(defel, pstate);
236 :
237 12 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
238 12 : opts->synchronous_commit = defGetString(defel);
239 :
240 : /* Test if the given value is valid for synchronous_commit GUC. */
241 12 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
242 : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
243 : false, 0, false);
244 : }
245 396 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
246 66 : strcmp(defel->defname, "refresh") == 0)
247 : {
248 66 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
249 0 : errorConflictingDefElem(defel, pstate);
250 :
251 66 : opts->specified_opts |= SUBOPT_REFRESH;
252 66 : opts->refresh = defGetBoolean(defel);
253 : }
254 330 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
255 306 : strcmp(defel->defname, "binary") == 0)
256 : {
257 32 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
258 0 : errorConflictingDefElem(defel, pstate);
259 :
260 32 : opts->specified_opts |= SUBOPT_BINARY;
261 32 : opts->binary = defGetBoolean(defel);
262 : }
263 298 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
264 274 : strcmp(defel->defname, "streaming") == 0)
265 : {
266 74 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
267 0 : errorConflictingDefElem(defel, pstate);
268 :
269 74 : opts->specified_opts |= SUBOPT_STREAMING;
270 74 : opts->streaming = defGetStreamingMode(defel);
271 : }
272 224 : else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
273 200 : strcmp(defel->defname, "two_phase") == 0)
274 : {
275 42 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
276 0 : errorConflictingDefElem(defel, pstate);
277 :
278 42 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
279 42 : opts->twophase = defGetBoolean(defel);
280 : }
281 182 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
282 158 : strcmp(defel->defname, "disable_on_error") == 0)
283 : {
284 20 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
285 0 : errorConflictingDefElem(defel, pstate);
286 :
287 20 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
288 20 : opts->disableonerr = defGetBoolean(defel);
289 : }
290 162 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
291 138 : strcmp(defel->defname, "password_required") == 0)
292 : {
293 24 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
294 0 : errorConflictingDefElem(defel, pstate);
295 :
296 24 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
297 24 : opts->passwordrequired = defGetBoolean(defel);
298 : }
299 138 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
300 114 : strcmp(defel->defname, "run_as_owner") == 0)
301 : {
302 18 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
303 0 : errorConflictingDefElem(defel, pstate);
304 :
305 18 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
306 18 : opts->runasowner = defGetBoolean(defel);
307 : }
308 120 : else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
309 96 : strcmp(defel->defname, "failover") == 0)
310 : {
311 24 : if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
312 0 : errorConflictingDefElem(defel, pstate);
313 :
314 24 : opts->specified_opts |= SUBOPT_FAILOVER;
315 24 : opts->failover = defGetBoolean(defel);
316 : }
317 96 : else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
318 72 : strcmp(defel->defname, "retain_dead_tuples") == 0)
319 : {
320 24 : if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
321 0 : errorConflictingDefElem(defel, pstate);
322 :
323 24 : opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
324 24 : opts->retaindeadtuples = defGetBoolean(defel);
325 : }
326 72 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
327 48 : strcmp(defel->defname, "origin") == 0)
328 : {
329 42 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
330 0 : errorConflictingDefElem(defel, pstate);
331 :
332 42 : opts->specified_opts |= SUBOPT_ORIGIN;
333 42 : pfree(opts->origin);
334 :
335 : /*
336 : * Even though the "origin" parameter allows only "none" and "any"
337 : * values, it is implemented as a string type so that the
338 : * parameter can be extended in future versions to support
339 : * filtering using origin names specified by the user.
340 : */
341 42 : opts->origin = defGetString(defel);
342 :
343 58 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
344 16 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
345 6 : ereport(ERROR,
346 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
347 : errmsg("unrecognized origin value: \"%s\"", opts->origin));
348 : }
349 30 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
350 24 : strcmp(defel->defname, "lsn") == 0)
351 18 : {
352 24 : char *lsn_str = defGetString(defel);
353 : XLogRecPtr lsn;
354 :
355 24 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
356 0 : errorConflictingDefElem(defel, pstate);
357 :
358 : /* Setting lsn = NONE is treated as resetting LSN */
359 24 : if (strcmp(lsn_str, "none") == 0)
360 6 : lsn = InvalidXLogRecPtr;
361 : else
362 : {
363 : /* Parse the argument as LSN */
364 18 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
365 : CStringGetDatum(lsn_str)));
366 :
367 18 : if (XLogRecPtrIsInvalid(lsn))
368 6 : ereport(ERROR,
369 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
370 : errmsg("invalid WAL location (LSN): %s", lsn_str)));
371 : }
372 :
373 18 : opts->specified_opts |= SUBOPT_LSN;
374 18 : opts->lsn = lsn;
375 : }
376 : else
377 6 : ereport(ERROR,
378 : (errcode(ERRCODE_SYNTAX_ERROR),
379 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
380 : }
381 :
382 : /*
383 : * We've been explicitly asked to not connect, that requires some
384 : * additional processing.
385 : */
386 860 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
387 : {
388 : /* Check for incompatible options from the user. */
389 140 : if (opts->enabled &&
390 140 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
391 6 : ereport(ERROR,
392 : (errcode(ERRCODE_SYNTAX_ERROR),
393 : /*- translator: both %s are strings of the form "option = value" */
394 : errmsg("%s and %s are mutually exclusive options",
395 : "connect = false", "enabled = true")));
396 :
397 134 : if (opts->create_slot &&
398 128 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
399 6 : ereport(ERROR,
400 : (errcode(ERRCODE_SYNTAX_ERROR),
401 : errmsg("%s and %s are mutually exclusive options",
402 : "connect = false", "create_slot = true")));
403 :
404 128 : if (opts->copy_data &&
405 122 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
406 6 : ereport(ERROR,
407 : (errcode(ERRCODE_SYNTAX_ERROR),
408 : errmsg("%s and %s are mutually exclusive options",
409 : "connect = false", "copy_data = true")));
410 :
411 : /* Change the defaults of other options. */
412 122 : opts->enabled = false;
413 122 : opts->create_slot = false;
414 122 : opts->copy_data = false;
415 : }
416 :
417 : /*
418 : * Do additional checking for disallowed combination when slot_name = NONE
419 : * was used.
420 : */
421 842 : if (!opts->slot_name &&
422 814 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
423 : {
424 110 : if (opts->enabled)
425 : {
426 18 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
427 6 : ereport(ERROR,
428 : (errcode(ERRCODE_SYNTAX_ERROR),
429 : /*- translator: both %s are strings of the form "option = value" */
430 : errmsg("%s and %s are mutually exclusive options",
431 : "slot_name = NONE", "enabled = true")));
432 : else
433 12 : ereport(ERROR,
434 : (errcode(ERRCODE_SYNTAX_ERROR),
435 : /*- translator: both %s are strings of the form "option = value" */
436 : errmsg("subscription with %s must also set %s",
437 : "slot_name = NONE", "enabled = false")));
438 : }
439 :
440 92 : if (opts->create_slot)
441 : {
442 12 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
443 6 : ereport(ERROR,
444 : (errcode(ERRCODE_SYNTAX_ERROR),
445 : /*- translator: both %s are strings of the form "option = value" */
446 : errmsg("%s and %s are mutually exclusive options",
447 : "slot_name = NONE", "create_slot = true")));
448 : else
449 6 : ereport(ERROR,
450 : (errcode(ERRCODE_SYNTAX_ERROR),
451 : /*- translator: both %s are strings of the form "option = value" */
452 : errmsg("subscription with %s must also set %s",
453 : "slot_name = NONE", "create_slot = false")));
454 : }
455 : }
456 812 : }
457 :
458 : /*
459 : * Check that the specified publications are present on the publisher.
460 : */
461 : static void
462 254 : check_publications(WalReceiverConn *wrconn, List *publications)
463 : {
464 : WalRcvExecResult *res;
465 : StringInfo cmd;
466 : TupleTableSlot *slot;
467 254 : List *publicationsCopy = NIL;
468 254 : Oid tableRow[1] = {TEXTOID};
469 :
470 254 : cmd = makeStringInfo();
471 254 : appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
472 : " pg_catalog.pg_publication t WHERE\n"
473 : " t.pubname IN (");
474 254 : GetPublicationsStr(publications, cmd, true);
475 254 : appendStringInfoChar(cmd, ')');
476 :
477 254 : res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
478 254 : destroyStringInfo(cmd);
479 :
480 254 : if (res->status != WALRCV_OK_TUPLES)
481 0 : ereport(ERROR,
482 : errmsg("could not receive list of publications from the publisher: %s",
483 : res->err));
484 :
485 254 : publicationsCopy = list_copy(publications);
486 :
487 : /* Process publication(s). */
488 254 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
489 560 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
490 : {
491 : char *pubname;
492 : bool isnull;
493 :
494 306 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
495 : Assert(!isnull);
496 :
497 : /* Delete the publication present in publisher from the list. */
498 306 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
499 306 : ExecClearTuple(slot);
500 : }
501 :
502 254 : ExecDropSingleTupleTableSlot(slot);
503 :
504 254 : walrcv_clear_result(res);
505 :
506 254 : if (list_length(publicationsCopy))
507 : {
508 : /* Prepare the list of non-existent publication(s) for error message. */
509 8 : StringInfo pubnames = makeStringInfo();
510 :
511 8 : GetPublicationsStr(publicationsCopy, pubnames, false);
512 8 : ereport(WARNING,
513 : errcode(ERRCODE_UNDEFINED_OBJECT),
514 : errmsg_plural("publication %s does not exist on the publisher",
515 : "publications %s do not exist on the publisher",
516 : list_length(publicationsCopy),
517 : pubnames->data));
518 : }
519 254 : }
520 :
521 : /*
522 : * Auxiliary function to build a text array out of a list of String nodes.
523 : */
524 : static Datum
525 384 : publicationListToArray(List *publist)
526 : {
527 : ArrayType *arr;
528 : Datum *datums;
529 : MemoryContext memcxt;
530 : MemoryContext oldcxt;
531 :
532 : /* Create memory context for temporary allocations. */
533 384 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
534 : "publicationListToArray to array",
535 : ALLOCSET_DEFAULT_SIZES);
536 384 : oldcxt = MemoryContextSwitchTo(memcxt);
537 :
538 384 : datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
539 :
540 384 : check_duplicates_in_publist(publist, datums);
541 :
542 378 : MemoryContextSwitchTo(oldcxt);
543 :
544 378 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
545 :
546 378 : MemoryContextDelete(memcxt);
547 :
548 378 : return PointerGetDatum(arr);
549 : }
550 :
551 : /*
552 : * Create new subscription.
553 : */
554 : ObjectAddress
555 466 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
556 : bool isTopLevel)
557 : {
558 : Relation rel;
559 : ObjectAddress myself;
560 : Oid subid;
561 : bool nulls[Natts_pg_subscription];
562 : Datum values[Natts_pg_subscription];
563 466 : Oid owner = GetUserId();
564 : HeapTuple tup;
565 : char *conninfo;
566 : char originname[NAMEDATALEN];
567 : List *publications;
568 : bits32 supported_opts;
569 466 : SubOpts opts = {0};
570 : AclResult aclresult;
571 :
572 : /*
573 : * Parse and check options.
574 : *
575 : * Connection and publication should not be specified here.
576 : */
577 466 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
578 : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
579 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
580 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
581 : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
582 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
583 : SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
584 466 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
585 :
586 : /*
587 : * Since creating a replication slot is not transactional, rolling back
588 : * the transaction leaves the created replication slot. So we cannot run
589 : * CREATE SUBSCRIPTION inside a transaction block if creating a
590 : * replication slot.
591 : */
592 382 : if (opts.create_slot)
593 250 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
594 :
595 : /*
596 : * We don't want to allow unprivileged users to be able to trigger
597 : * attempts to access arbitrary network destinations, so require the user
598 : * to have been specifically authorized to create subscriptions.
599 : */
600 376 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
601 6 : ereport(ERROR,
602 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
603 : errmsg("permission denied to create subscription"),
604 : errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
605 : "pg_create_subscription")));
606 :
607 : /*
608 : * Since a subscription is a database object, we also check for CREATE
609 : * permission on the database.
610 : */
611 370 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
612 : owner, ACL_CREATE);
613 370 : if (aclresult != ACLCHECK_OK)
614 12 : aclcheck_error(aclresult, OBJECT_DATABASE,
615 6 : get_database_name(MyDatabaseId));
616 :
617 : /*
618 : * Non-superusers are required to set a password for authentication, and
619 : * that password must be used by the target server, but the superuser can
620 : * exempt a subscription from this requirement.
621 : */
622 364 : if (!opts.passwordrequired && !superuser_arg(owner))
623 6 : ereport(ERROR,
624 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
625 : errmsg("password_required=false is superuser-only"),
626 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
627 :
628 : /*
629 : * If built with appropriate switch, whine when regression-testing
630 : * conventions for subscription names are violated.
631 : */
632 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
633 : if (strncmp(stmt->subname, "regress_", 8) != 0)
634 : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
635 : #endif
636 :
637 358 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
638 :
639 : /* Check if name is used */
640 358 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
641 : MyDatabaseId, CStringGetDatum(stmt->subname));
642 358 : if (OidIsValid(subid))
643 : {
644 6 : ereport(ERROR,
645 : (errcode(ERRCODE_DUPLICATE_OBJECT),
646 : errmsg("subscription \"%s\" already exists",
647 : stmt->subname)));
648 : }
649 :
650 : /* Ensure that we can enable retain_dead_tuples */
651 352 : if (opts.retaindeadtuples)
652 8 : CheckSubDeadTupleRetention(true, !opts.enabled, WARNING);
653 :
654 352 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
655 310 : opts.slot_name == NULL)
656 310 : opts.slot_name = stmt->subname;
657 :
658 : /* The default for synchronous_commit of subscriptions is off. */
659 352 : if (opts.synchronous_commit == NULL)
660 352 : opts.synchronous_commit = "off";
661 :
662 352 : conninfo = stmt->conninfo;
663 352 : publications = stmt->publication;
664 :
665 : /* Load the library providing us libpq calls. */
666 352 : load_file("libpqwalreceiver", false);
667 :
668 : /* Check the connection info string. */
669 352 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
670 :
671 : /* Everything ok, form a new tuple. */
672 334 : memset(values, 0, sizeof(values));
673 334 : memset(nulls, false, sizeof(nulls));
674 :
675 334 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
676 : Anum_pg_subscription_oid);
677 334 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
678 334 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
679 334 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
680 334 : values[Anum_pg_subscription_subname - 1] =
681 334 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
682 334 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
683 334 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
684 334 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
685 334 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
686 334 : values[Anum_pg_subscription_subtwophasestate - 1] =
687 334 : CharGetDatum(opts.twophase ?
688 : LOGICALREP_TWOPHASE_STATE_PENDING :
689 : LOGICALREP_TWOPHASE_STATE_DISABLED);
690 334 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
691 334 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
692 334 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
693 334 : values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
694 334 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
695 334 : BoolGetDatum(opts.retaindeadtuples);
696 334 : values[Anum_pg_subscription_subconninfo - 1] =
697 334 : CStringGetTextDatum(conninfo);
698 334 : if (opts.slot_name)
699 314 : values[Anum_pg_subscription_subslotname - 1] =
700 314 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
701 : else
702 20 : nulls[Anum_pg_subscription_subslotname - 1] = true;
703 334 : values[Anum_pg_subscription_subsynccommit - 1] =
704 334 : CStringGetTextDatum(opts.synchronous_commit);
705 328 : values[Anum_pg_subscription_subpublications - 1] =
706 334 : publicationListToArray(publications);
707 328 : values[Anum_pg_subscription_suborigin - 1] =
708 328 : CStringGetTextDatum(opts.origin);
709 :
710 328 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
711 :
712 : /* Insert tuple into catalog. */
713 328 : CatalogTupleInsert(rel, tup);
714 328 : heap_freetuple(tup);
715 :
716 328 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
717 :
718 328 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
719 328 : replorigin_create(originname);
720 :
721 : /*
722 : * Connect to remote side to execute requested commands and fetch table
723 : * info.
724 : */
725 328 : if (opts.connect)
726 : {
727 : char *err;
728 : WalReceiverConn *wrconn;
729 : List *tables;
730 : ListCell *lc;
731 : char table_state;
732 : bool must_use_password;
733 :
734 : /* Try to connect to the publisher. */
735 242 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
736 242 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
737 : stmt->subname, &err);
738 242 : if (!wrconn)
739 6 : ereport(ERROR,
740 : (errcode(ERRCODE_CONNECTION_FAILURE),
741 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
742 : stmt->subname, err)));
743 :
744 236 : PG_TRY();
745 : {
746 236 : check_publications(wrconn, publications);
747 236 : check_publications_origin(wrconn, publications, opts.copy_data,
748 236 : opts.retaindeadtuples, opts.origin,
749 : NULL, 0, stmt->subname);
750 :
751 236 : if (opts.retaindeadtuples)
752 6 : check_pub_dead_tuple_retention(wrconn);
753 :
754 : /*
755 : * Set sync state based on if we were asked to do data copy or
756 : * not.
757 : */
758 236 : table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
759 :
760 : /*
761 : * Get the table list from publisher and build local table status
762 : * info.
763 : */
764 236 : tables = fetch_table_list(wrconn, publications);
765 592 : foreach(lc, tables)
766 : {
767 358 : RangeVar *rv = (RangeVar *) lfirst(lc);
768 : Oid relid;
769 :
770 358 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
771 :
772 : /* Check for supported relkind. */
773 358 : CheckSubscriptionRelkind(get_rel_relkind(relid),
774 358 : rv->schemaname, rv->relname);
775 :
776 358 : AddSubscriptionRelState(subid, relid, table_state,
777 : InvalidXLogRecPtr, true);
778 : }
779 :
780 : /*
781 : * If requested, create permanent slot for the subscription. We
782 : * won't use the initial snapshot for anything, so no need to
783 : * export it.
784 : */
785 234 : if (opts.create_slot)
786 : {
787 224 : bool twophase_enabled = false;
788 :
789 : Assert(opts.slot_name);
790 :
791 : /*
792 : * Even if two_phase is set, don't create the slot with
793 : * two-phase enabled. Will enable it once all the tables are
794 : * synced and ready. This avoids race-conditions like prepared
795 : * transactions being skipped due to changes not being applied
796 : * due to checks in should_apply_changes_for_rel() when
797 : * tablesync for the corresponding tables are in progress. See
798 : * comments atop worker.c.
799 : *
800 : * Note that if tables were specified but copy_data is false
801 : * then it is safe to enable two_phase up-front because those
802 : * tables are already initially in READY state. When the
803 : * subscription has no tables, we leave the twophase state as
804 : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
805 : * PUBLICATION to work.
806 : */
807 224 : if (opts.twophase && !opts.copy_data && tables != NIL)
808 2 : twophase_enabled = true;
809 :
810 224 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
811 : opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
812 :
813 224 : if (twophase_enabled)
814 2 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
815 :
816 224 : ereport(NOTICE,
817 : (errmsg("created replication slot \"%s\" on publisher",
818 : opts.slot_name)));
819 : }
820 : }
821 2 : PG_FINALLY();
822 : {
823 236 : walrcv_disconnect(wrconn);
824 : }
825 236 : PG_END_TRY();
826 : }
827 : else
828 86 : ereport(WARNING,
829 : (errmsg("subscription was created, but is not connected"),
830 : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
831 :
832 320 : table_close(rel, RowExclusiveLock);
833 :
834 320 : pgstat_create_subscription(subid);
835 :
836 320 : if (opts.enabled)
837 220 : ApplyLauncherWakeupAtCommit();
838 :
839 320 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
840 :
841 320 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
842 :
843 320 : return myself;
844 : }
845 :
846 : static void
847 66 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
848 : List *validate_publications)
849 : {
850 : char *err;
851 : List *pubrel_names;
852 : List *subrel_states;
853 : Oid *subrel_local_oids;
854 : Oid *pubrel_local_oids;
855 : ListCell *lc;
856 : int off;
857 : int remove_rel_len;
858 : int subrel_count;
859 66 : Relation rel = NULL;
860 : typedef struct SubRemoveRels
861 : {
862 : Oid relid;
863 : char state;
864 : } SubRemoveRels;
865 : SubRemoveRels *sub_remove_rels;
866 : WalReceiverConn *wrconn;
867 : bool must_use_password;
868 :
869 : /* Load the library providing us libpq calls. */
870 66 : load_file("libpqwalreceiver", false);
871 :
872 : /* Try to connect to the publisher. */
873 66 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
874 66 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
875 : sub->name, &err);
876 64 : if (!wrconn)
877 0 : ereport(ERROR,
878 : (errcode(ERRCODE_CONNECTION_FAILURE),
879 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
880 : sub->name, err)));
881 :
882 64 : PG_TRY();
883 : {
884 64 : if (validate_publications)
885 18 : check_publications(wrconn, validate_publications);
886 :
887 : /* Get the table list from publisher. */
888 64 : pubrel_names = fetch_table_list(wrconn, sub->publications);
889 :
890 : /* Get local table list. */
891 64 : subrel_states = GetSubscriptionRelations(sub->oid, false);
892 64 : subrel_count = list_length(subrel_states);
893 :
894 : /*
895 : * Build qsorted array of local table oids for faster lookup. This can
896 : * potentially contain all tables in the database so speed of lookup
897 : * is important.
898 : */
899 64 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
900 64 : off = 0;
901 238 : foreach(lc, subrel_states)
902 : {
903 174 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
904 :
905 174 : subrel_local_oids[off++] = relstate->relid;
906 : }
907 64 : qsort(subrel_local_oids, subrel_count,
908 : sizeof(Oid), oid_cmp);
909 :
910 64 : check_publications_origin(wrconn, sub->publications, copy_data,
911 64 : sub->retaindeadtuples, sub->origin,
912 : subrel_local_oids, subrel_count, sub->name);
913 :
914 : /*
915 : * Rels that we want to remove from subscription and drop any slots
916 : * and origins corresponding to them.
917 : */
918 64 : sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
919 :
920 : /*
921 : * Walk over the remote tables and try to match them to locally known
922 : * tables. If the table is not known locally create a new state for
923 : * it.
924 : *
925 : * Also builds array of local oids of remote tables for the next step.
926 : */
927 64 : off = 0;
928 64 : pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
929 :
930 242 : foreach(lc, pubrel_names)
931 : {
932 178 : RangeVar *rv = (RangeVar *) lfirst(lc);
933 : Oid relid;
934 :
935 178 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
936 :
937 : /* Check for supported relkind. */
938 178 : CheckSubscriptionRelkind(get_rel_relkind(relid),
939 178 : rv->schemaname, rv->relname);
940 :
941 178 : pubrel_local_oids[off++] = relid;
942 :
943 178 : if (!bsearch(&relid, subrel_local_oids,
944 : subrel_count, sizeof(Oid), oid_cmp))
945 : {
946 44 : AddSubscriptionRelState(sub->oid, relid,
947 : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
948 : InvalidXLogRecPtr, true);
949 44 : ereport(DEBUG1,
950 : (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
951 : rv->schemaname, rv->relname, sub->name)));
952 : }
953 : }
954 :
955 : /*
956 : * Next remove state for tables we should not care about anymore using
957 : * the data we collected above
958 : */
959 64 : qsort(pubrel_local_oids, list_length(pubrel_names),
960 : sizeof(Oid), oid_cmp);
961 :
962 64 : remove_rel_len = 0;
963 238 : for (off = 0; off < subrel_count; off++)
964 : {
965 174 : Oid relid = subrel_local_oids[off];
966 :
967 174 : if (!bsearch(&relid, pubrel_local_oids,
968 174 : list_length(pubrel_names), sizeof(Oid), oid_cmp))
969 : {
970 : char state;
971 : XLogRecPtr statelsn;
972 :
973 : /*
974 : * Lock pg_subscription_rel with AccessExclusiveLock to
975 : * prevent any race conditions with the apply worker
976 : * re-launching workers at the same time this code is trying
977 : * to remove those tables.
978 : *
979 : * Even if new worker for this particular rel is restarted it
980 : * won't be able to make any progress as we hold exclusive
981 : * lock on pg_subscription_rel till the transaction end. It
982 : * will simply exit as there is no corresponding rel entry.
983 : *
984 : * This locking also ensures that the state of rels won't
985 : * change till we are done with this refresh operation.
986 : */
987 40 : if (!rel)
988 16 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
989 :
990 : /* Last known rel state. */
991 40 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
992 :
993 40 : sub_remove_rels[remove_rel_len].relid = relid;
994 40 : sub_remove_rels[remove_rel_len++].state = state;
995 :
996 40 : RemoveSubscriptionRel(sub->oid, relid);
997 :
998 40 : logicalrep_worker_stop(sub->oid, relid);
999 :
1000 : /*
1001 : * For READY state, we would have already dropped the
1002 : * tablesync origin.
1003 : */
1004 40 : if (state != SUBREL_STATE_READY)
1005 : {
1006 : char originname[NAMEDATALEN];
1007 :
1008 : /*
1009 : * Drop the tablesync's origin tracking if exists.
1010 : *
1011 : * It is possible that the origin is not yet created for
1012 : * tablesync worker, this can happen for the states before
1013 : * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
1014 : * apply worker can also concurrently try to drop the
1015 : * origin and by this time the origin might be already
1016 : * removed. For these reasons, passing missing_ok = true.
1017 : */
1018 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1019 : sizeof(originname));
1020 0 : replorigin_drop_by_name(originname, true, false);
1021 : }
1022 :
1023 40 : ereport(DEBUG1,
1024 : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1025 : get_namespace_name(get_rel_namespace(relid)),
1026 : get_rel_name(relid),
1027 : sub->name)));
1028 : }
1029 : }
1030 :
1031 : /*
1032 : * Drop the tablesync slots associated with removed tables. This has
1033 : * to be at the end because otherwise if there is an error while doing
1034 : * the database operations we won't be able to rollback dropped slots.
1035 : */
1036 104 : for (off = 0; off < remove_rel_len; off++)
1037 : {
1038 40 : if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
1039 0 : sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
1040 : {
1041 0 : char syncslotname[NAMEDATALEN] = {0};
1042 :
1043 : /*
1044 : * For READY/SYNCDONE states we know the tablesync slot has
1045 : * already been dropped by the tablesync worker.
1046 : *
1047 : * For other states, there is no certainty, maybe the slot
1048 : * does not exist yet. Also, if we fail after removing some of
1049 : * the slots, next time, it will again try to drop already
1050 : * dropped slots and fail. For these reasons, we allow
1051 : * missing_ok = true for the drop.
1052 : */
1053 0 : ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
1054 : syncslotname, sizeof(syncslotname));
1055 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1056 : }
1057 : }
1058 : }
1059 0 : PG_FINALLY();
1060 : {
1061 64 : walrcv_disconnect(wrconn);
1062 : }
1063 64 : PG_END_TRY();
1064 :
1065 64 : if (rel)
1066 16 : table_close(rel, NoLock);
1067 64 : }
1068 :
1069 : /*
1070 : * Common checks for altering failover, two_phase, and retain_dead_tuples
1071 : * options.
1072 : */
1073 : static void
1074 24 : CheckAlterSubOption(Subscription *sub, const char *option,
1075 : bool slot_needs_update, bool isTopLevel)
1076 : {
1077 : Assert(strcmp(option, "failover") == 0 ||
1078 : strcmp(option, "two_phase") == 0 ||
1079 : strcmp(option, "retain_dead_tuples") == 0);
1080 :
1081 : /*
1082 : * Altering the retain_dead_tuples option does not update the slot on the
1083 : * publisher.
1084 : */
1085 : Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1086 :
1087 : /*
1088 : * Do not allow changing the option if the subscription is enabled. This
1089 : * is because both failover and two_phase options of the slot on the
1090 : * publisher cannot be modified if the slot is currently acquired by the
1091 : * existing walsender.
1092 : *
1093 : * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1094 : * the publisher by the existing walsender, so we could have allowed that
1095 : * even when the subscription is enabled. But we kept this restriction for
1096 : * the sake of consistency and simplicity.
1097 : *
1098 : * Additionally, do not allow changing the retain_dead_tuples option when
1099 : * the subscription is enabled to prevent race conditions arising from the
1100 : * new option value being acknowledged asynchronously by the launcher and
1101 : * apply workers.
1102 : *
1103 : * Without the restriction, a race condition may arise when a user
1104 : * disables and immediately re-enables the retain_dead_tuples option. In
1105 : * this case, the launcher might drop the slot upon noticing the disabled
1106 : * action, while the apply worker may keep maintaining
1107 : * oldest_nonremovable_xid without noticing the option change. During this
1108 : * period, a transaction ID wraparound could falsely make this ID appear
1109 : * as if it originates from the future w.r.t the transaction ID stored in
1110 : * the slot maintained by launcher.
1111 : *
1112 : * Similarly, if the user enables retain_dead_tuples concurrently with the
1113 : * launcher starting the worker, the apply worker may start calculating
1114 : * oldest_nonremovable_xid before the launcher notices the enable action.
1115 : * Consequently, the launcher may update slot.xmin to a newer value than
1116 : * that maintained by the worker. In subsequent cycles, upon integrating
1117 : * the worker's oldest_nonremovable_xid, the launcher might detect a
1118 : * retreat in the calculated xmin, necessitating additional handling.
1119 : *
1120 : * XXX To address the above race conditions, we can define
1121 : * oldest_nonremovable_xid as FullTransactionID and adds the check to
1122 : * disallow retreating the conflict slot's xmin. For now, we kept the
1123 : * implementation simple by disallowing change to the retain_dead_tuples,
1124 : * but in the future we can change this after some more analysis.
1125 : *
1126 : * Note that we could restrict only the enabling of retain_dead_tuples to
1127 : * avoid the race conditions described above, but we maintain the
1128 : * restriction for both enable and disable operations for the sake of
1129 : * consistency.
1130 : */
1131 24 : if (sub->enabled)
1132 4 : ereport(ERROR,
1133 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1134 : errmsg("cannot set option \"%s\" for enabled subscription",
1135 : option)));
1136 :
1137 20 : if (slot_needs_update)
1138 : {
1139 : StringInfoData cmd;
1140 :
1141 : /*
1142 : * A valid slot must be associated with the subscription for us to
1143 : * modify any of the slot's properties.
1144 : */
1145 14 : if (!sub->slotname)
1146 0 : ereport(ERROR,
1147 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1148 : errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1149 : option)));
1150 :
1151 : /* The changed option of the slot can't be rolled back. */
1152 14 : initStringInfo(&cmd);
1153 14 : appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1154 :
1155 14 : PreventInTransactionBlock(isTopLevel, cmd.data);
1156 8 : pfree(cmd.data);
1157 : }
1158 14 : }
1159 :
1160 : /*
1161 : * Alter the existing subscription.
1162 : */
1163 : ObjectAddress
1164 486 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1165 : bool isTopLevel)
1166 : {
1167 : Relation rel;
1168 : ObjectAddress myself;
1169 : bool nulls[Natts_pg_subscription];
1170 : bool replaces[Natts_pg_subscription];
1171 : Datum values[Natts_pg_subscription];
1172 : HeapTuple tup;
1173 : Oid subid;
1174 486 : bool update_tuple = false;
1175 486 : bool update_failover = false;
1176 486 : bool update_two_phase = false;
1177 486 : bool check_pub_rdt = false;
1178 : bool retain_dead_tuples;
1179 : char *origin;
1180 : Subscription *sub;
1181 : Form_pg_subscription form;
1182 : bits32 supported_opts;
1183 486 : SubOpts opts = {0};
1184 :
1185 486 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1186 :
1187 : /* Fetch the existing tuple. */
1188 486 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1189 : CStringGetDatum(stmt->subname));
1190 :
1191 486 : if (!HeapTupleIsValid(tup))
1192 6 : ereport(ERROR,
1193 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1194 : errmsg("subscription \"%s\" does not exist",
1195 : stmt->subname)));
1196 :
1197 480 : form = (Form_pg_subscription) GETSTRUCT(tup);
1198 480 : subid = form->oid;
1199 :
1200 : /* must be owner */
1201 480 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1202 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1203 0 : stmt->subname);
1204 :
1205 480 : sub = GetSubscription(subid, false);
1206 :
1207 480 : retain_dead_tuples = sub->retaindeadtuples;
1208 480 : origin = sub->origin;
1209 :
1210 : /*
1211 : * Don't allow non-superuser modification of a subscription with
1212 : * password_required=false.
1213 : */
1214 480 : if (!sub->passwordrequired && !superuser())
1215 0 : ereport(ERROR,
1216 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1217 : errmsg("password_required=false is superuser-only"),
1218 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1219 :
1220 : /* Lock the subscription so nobody else can do anything with it. */
1221 480 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1222 :
1223 : /* Form a new tuple. */
1224 480 : memset(values, 0, sizeof(values));
1225 480 : memset(nulls, false, sizeof(nulls));
1226 480 : memset(replaces, false, sizeof(replaces));
1227 :
1228 480 : switch (stmt->kind)
1229 : {
1230 200 : case ALTER_SUBSCRIPTION_OPTIONS:
1231 : {
1232 200 : supported_opts = (SUBOPT_SLOT_NAME |
1233 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1234 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
1235 : SUBOPT_DISABLE_ON_ERR |
1236 : SUBOPT_PASSWORD_REQUIRED |
1237 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1238 : SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
1239 :
1240 200 : parse_subscription_options(pstate, stmt->options,
1241 : supported_opts, &opts);
1242 :
1243 182 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1244 : {
1245 : /*
1246 : * The subscription must be disabled to allow slot_name as
1247 : * 'none', otherwise, the apply worker will repeatedly try
1248 : * to stream the data using that slot_name which neither
1249 : * exists on the publisher nor the user will be allowed to
1250 : * create it.
1251 : */
1252 66 : if (sub->enabled && !opts.slot_name)
1253 0 : ereport(ERROR,
1254 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1255 : errmsg("cannot set %s for enabled subscription",
1256 : "slot_name = NONE")));
1257 :
1258 66 : if (opts.slot_name)
1259 6 : values[Anum_pg_subscription_subslotname - 1] =
1260 6 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1261 : else
1262 60 : nulls[Anum_pg_subscription_subslotname - 1] = true;
1263 66 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1264 : }
1265 :
1266 182 : if (opts.synchronous_commit)
1267 : {
1268 6 : values[Anum_pg_subscription_subsynccommit - 1] =
1269 6 : CStringGetTextDatum(opts.synchronous_commit);
1270 6 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1271 : }
1272 :
1273 182 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1274 : {
1275 18 : values[Anum_pg_subscription_subbinary - 1] =
1276 18 : BoolGetDatum(opts.binary);
1277 18 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1278 : }
1279 :
1280 182 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1281 : {
1282 30 : values[Anum_pg_subscription_substream - 1] =
1283 30 : CharGetDatum(opts.streaming);
1284 30 : replaces[Anum_pg_subscription_substream - 1] = true;
1285 : }
1286 :
1287 182 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1288 : {
1289 : values[Anum_pg_subscription_subdisableonerr - 1]
1290 6 : = BoolGetDatum(opts.disableonerr);
1291 : replaces[Anum_pg_subscription_subdisableonerr - 1]
1292 6 : = true;
1293 : }
1294 :
1295 182 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1296 : {
1297 : /* Non-superuser may not disable password_required. */
1298 12 : if (!opts.passwordrequired && !superuser())
1299 0 : ereport(ERROR,
1300 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1301 : errmsg("password_required=false is superuser-only"),
1302 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1303 :
1304 : values[Anum_pg_subscription_subpasswordrequired - 1]
1305 12 : = BoolGetDatum(opts.passwordrequired);
1306 : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1307 12 : = true;
1308 : }
1309 :
1310 182 : if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1311 : {
1312 14 : values[Anum_pg_subscription_subrunasowner - 1] =
1313 14 : BoolGetDatum(opts.runasowner);
1314 14 : replaces[Anum_pg_subscription_subrunasowner - 1] = true;
1315 : }
1316 :
1317 182 : if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1318 : {
1319 : /*
1320 : * We need to update both the slot and the subscription
1321 : * for the two_phase option. We can enable the two_phase
1322 : * option for a slot only once the initial data
1323 : * synchronization is done. This is to avoid missing some
1324 : * data as explained in comments atop worker.c.
1325 : */
1326 6 : update_two_phase = !opts.twophase;
1327 :
1328 6 : CheckAlterSubOption(sub, "two_phase", update_two_phase,
1329 : isTopLevel);
1330 :
1331 : /*
1332 : * Modifying the two_phase slot option requires a slot
1333 : * lookup by slot name, so changing the slot name at the
1334 : * same time is not allowed.
1335 : */
1336 6 : if (update_two_phase &&
1337 2 : IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1338 0 : ereport(ERROR,
1339 : (errcode(ERRCODE_SYNTAX_ERROR),
1340 : errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1341 :
1342 : /*
1343 : * Note that workers may still survive even if the
1344 : * subscription has been disabled.
1345 : *
1346 : * Ensure workers have already been exited to avoid
1347 : * getting prepared transactions while we are disabling
1348 : * the two_phase option. Otherwise, the changes of an
1349 : * already prepared transaction can be replicated again
1350 : * along with its corresponding commit, leading to
1351 : * duplicate data or errors.
1352 : */
1353 6 : if (logicalrep_workers_find(subid, true, true))
1354 0 : ereport(ERROR,
1355 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1356 : errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1357 : errhint("Try again after some time.")));
1358 :
1359 : /*
1360 : * two_phase cannot be disabled if there are any
1361 : * uncommitted prepared transactions present otherwise it
1362 : * can lead to duplicate data or errors as explained in
1363 : * the comment above.
1364 : */
1365 6 : if (update_two_phase &&
1366 2 : sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
1367 2 : LookupGXactBySubid(subid))
1368 0 : ereport(ERROR,
1369 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1370 : errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1371 : errhint("Resolve these transactions and try again.")));
1372 :
1373 : /* Change system catalog accordingly */
1374 6 : values[Anum_pg_subscription_subtwophasestate - 1] =
1375 6 : CharGetDatum(opts.twophase ?
1376 : LOGICALREP_TWOPHASE_STATE_PENDING :
1377 : LOGICALREP_TWOPHASE_STATE_DISABLED);
1378 6 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1379 : }
1380 :
1381 182 : if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1382 : {
1383 : /*
1384 : * Similar to the two_phase case above, we need to update
1385 : * the failover option for both the slot and the
1386 : * subscription.
1387 : */
1388 14 : update_failover = true;
1389 :
1390 14 : CheckAlterSubOption(sub, "failover", update_failover,
1391 : isTopLevel);
1392 :
1393 6 : values[Anum_pg_subscription_subfailover - 1] =
1394 6 : BoolGetDatum(opts.failover);
1395 6 : replaces[Anum_pg_subscription_subfailover - 1] = true;
1396 : }
1397 :
1398 174 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1399 : {
1400 4 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
1401 4 : BoolGetDatum(opts.retaindeadtuples);
1402 4 : replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
1403 :
1404 4 : CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1405 :
1406 : /*
1407 : * Workers may continue running even after the
1408 : * subscription has been disabled.
1409 : *
1410 : * To prevent race conditions (as described in
1411 : * CheckAlterSubOption()), ensure that all worker
1412 : * processes have already exited before proceeding.
1413 : */
1414 2 : if (logicalrep_workers_find(subid, true, true))
1415 0 : ereport(ERROR,
1416 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1417 : errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1418 : errhint("Try again after some time.")));
1419 :
1420 : /*
1421 : * Remind the user that enabling subscription will prevent
1422 : * the accumulation of dead tuples.
1423 : */
1424 2 : if (opts.retaindeadtuples)
1425 2 : CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE);
1426 :
1427 : /*
1428 : * Notify the launcher to manage the replication slot for
1429 : * conflict detection. This ensures that replication slot
1430 : * is efficiently handled (created, updated, or dropped)
1431 : * in response to any configuration changes.
1432 : */
1433 2 : ApplyLauncherWakeupAtCommit();
1434 :
1435 2 : check_pub_rdt = opts.retaindeadtuples;
1436 2 : retain_dead_tuples = opts.retaindeadtuples;
1437 : }
1438 :
1439 172 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1440 : {
1441 10 : values[Anum_pg_subscription_suborigin - 1] =
1442 10 : CStringGetTextDatum(opts.origin);
1443 10 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1444 :
1445 : /*
1446 : * Check if changes from different origins may be received
1447 : * from the publisher when the origin is changed to ANY
1448 : * and retain_dead_tuples is enabled.
1449 : */
1450 14 : check_pub_rdt = retain_dead_tuples &&
1451 4 : pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
1452 :
1453 10 : origin = opts.origin;
1454 : }
1455 :
1456 172 : update_tuple = true;
1457 172 : break;
1458 : }
1459 :
1460 92 : case ALTER_SUBSCRIPTION_ENABLED:
1461 : {
1462 92 : parse_subscription_options(pstate, stmt->options,
1463 : SUBOPT_ENABLED, &opts);
1464 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1465 :
1466 92 : if (!sub->slotname && opts.enabled)
1467 6 : ereport(ERROR,
1468 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1469 : errmsg("cannot enable subscription that does not have a slot name")));
1470 :
1471 : /*
1472 : * Check track_commit_timestamp only when enabling the
1473 : * subscription in case it was disabled after creation. See
1474 : * comments atop CheckSubDeadTupleRetention() for details.
1475 : */
1476 86 : if (sub->retaindeadtuples)
1477 8 : CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1478 8 : WARNING);
1479 :
1480 86 : values[Anum_pg_subscription_subenabled - 1] =
1481 86 : BoolGetDatum(opts.enabled);
1482 86 : replaces[Anum_pg_subscription_subenabled - 1] = true;
1483 :
1484 86 : if (opts.enabled)
1485 48 : ApplyLauncherWakeupAtCommit();
1486 :
1487 86 : update_tuple = true;
1488 :
1489 : /*
1490 : * The subscription might be initially created with
1491 : * connect=false and retain_dead_tuples=true, meaning the
1492 : * remote server's status may not be checked. Ensure this
1493 : * check is conducted now.
1494 : */
1495 86 : check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1496 86 : break;
1497 : }
1498 :
1499 20 : case ALTER_SUBSCRIPTION_CONNECTION:
1500 : /* Load the library providing us libpq calls. */
1501 20 : load_file("libpqwalreceiver", false);
1502 : /* Check the connection info string. */
1503 20 : walrcv_check_conninfo(stmt->conninfo,
1504 : sub->passwordrequired && !sub->ownersuperuser);
1505 :
1506 14 : values[Anum_pg_subscription_subconninfo - 1] =
1507 14 : CStringGetTextDatum(stmt->conninfo);
1508 14 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1509 14 : update_tuple = true;
1510 :
1511 : /*
1512 : * Since the remote server configuration might have changed,
1513 : * perform a check to ensure it permits enabling
1514 : * retain_dead_tuples.
1515 : */
1516 14 : check_pub_rdt = sub->retaindeadtuples;
1517 14 : break;
1518 :
1519 32 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
1520 : {
1521 32 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1522 32 : parse_subscription_options(pstate, stmt->options,
1523 : supported_opts, &opts);
1524 :
1525 32 : values[Anum_pg_subscription_subpublications - 1] =
1526 32 : publicationListToArray(stmt->publication);
1527 32 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1528 :
1529 32 : update_tuple = true;
1530 :
1531 : /* Refresh if user asked us to. */
1532 32 : if (opts.refresh)
1533 : {
1534 26 : if (!sub->enabled)
1535 0 : ereport(ERROR,
1536 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1537 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1538 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1539 :
1540 : /*
1541 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1542 : * not allowed.
1543 : */
1544 26 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1545 0 : ereport(ERROR,
1546 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1547 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1548 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1549 :
1550 26 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1551 :
1552 : /* Make sure refresh sees the new list of publications. */
1553 14 : sub->publications = stmt->publication;
1554 :
1555 14 : AlterSubscription_refresh(sub, opts.copy_data,
1556 : stmt->publication);
1557 : }
1558 :
1559 20 : break;
1560 : }
1561 :
1562 54 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
1563 : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1564 : {
1565 : List *publist;
1566 54 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1567 :
1568 54 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1569 54 : parse_subscription_options(pstate, stmt->options,
1570 : supported_opts, &opts);
1571 :
1572 54 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1573 18 : values[Anum_pg_subscription_subpublications - 1] =
1574 18 : publicationListToArray(publist);
1575 18 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1576 :
1577 18 : update_tuple = true;
1578 :
1579 : /* Refresh if user asked us to. */
1580 18 : if (opts.refresh)
1581 : {
1582 : /* We only need to validate user specified publications. */
1583 6 : List *validate_publications = (isadd) ? stmt->publication : NULL;
1584 :
1585 6 : if (!sub->enabled)
1586 0 : ereport(ERROR,
1587 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1588 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1589 : /* translator: %s is an SQL ALTER command */
1590 : errhint("Use %s instead.",
1591 : isadd ?
1592 : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1593 : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1594 :
1595 : /*
1596 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1597 : * not allowed.
1598 : */
1599 6 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1600 0 : ereport(ERROR,
1601 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1602 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1603 : /* translator: %s is an SQL ALTER command */
1604 : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1605 : isadd ?
1606 : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1607 : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1608 :
1609 6 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1610 :
1611 : /* Refresh the new list of publications. */
1612 6 : sub->publications = publist;
1613 :
1614 6 : AlterSubscription_refresh(sub, opts.copy_data,
1615 : validate_publications);
1616 : }
1617 :
1618 18 : break;
1619 : }
1620 :
1621 58 : case ALTER_SUBSCRIPTION_REFRESH:
1622 : {
1623 58 : if (!sub->enabled)
1624 6 : ereport(ERROR,
1625 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1626 : errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1627 :
1628 52 : parse_subscription_options(pstate, stmt->options,
1629 : SUBOPT_COPY_DATA, &opts);
1630 :
1631 : /*
1632 : * The subscription option "two_phase" requires that
1633 : * replication has passed the initial table synchronization
1634 : * phase before the two_phase becomes properly enabled.
1635 : *
1636 : * But, having reached this two-phase commit "enabled" state
1637 : * we must not allow any subsequent table initialization to
1638 : * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
1639 : * when the user had requested two_phase = on mode.
1640 : *
1641 : * The exception to this restriction is when copy_data =
1642 : * false, because when copy_data is false the tablesync will
1643 : * start already in READY state and will exit directly without
1644 : * doing anything.
1645 : *
1646 : * For more details see comments atop worker.c.
1647 : */
1648 52 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1649 0 : ereport(ERROR,
1650 : (errcode(ERRCODE_SYNTAX_ERROR),
1651 : errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1652 : errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1653 :
1654 52 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
1655 :
1656 46 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
1657 :
1658 44 : break;
1659 : }
1660 :
1661 24 : case ALTER_SUBSCRIPTION_SKIP:
1662 : {
1663 24 : parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1664 :
1665 : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1666 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1667 :
1668 : /*
1669 : * If the user sets subskiplsn, we do a sanity check to make
1670 : * sure that the specified LSN is a probable value.
1671 : */
1672 18 : if (!XLogRecPtrIsInvalid(opts.lsn))
1673 : {
1674 : RepOriginId originid;
1675 : char originname[NAMEDATALEN];
1676 : XLogRecPtr remote_lsn;
1677 :
1678 12 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
1679 : originname, sizeof(originname));
1680 12 : originid = replorigin_by_name(originname, false);
1681 12 : remote_lsn = replorigin_get_progress(originid, false);
1682 :
1683 : /* Check the given LSN is at least a future LSN */
1684 12 : if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1685 0 : ereport(ERROR,
1686 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1687 : errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
1688 : LSN_FORMAT_ARGS(opts.lsn),
1689 : LSN_FORMAT_ARGS(remote_lsn))));
1690 : }
1691 :
1692 18 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1693 18 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1694 :
1695 18 : update_tuple = true;
1696 18 : break;
1697 : }
1698 :
1699 0 : default:
1700 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1701 : stmt->kind);
1702 : }
1703 :
1704 : /* Update the catalog if needed. */
1705 372 : if (update_tuple)
1706 : {
1707 328 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1708 : replaces);
1709 :
1710 328 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1711 :
1712 328 : heap_freetuple(tup);
1713 : }
1714 :
1715 : /*
1716 : * Try to acquire the connection necessary either for modifying the slot
1717 : * or for checking if the remote server permits enabling
1718 : * retain_dead_tuples.
1719 : *
1720 : * This has to be at the end because otherwise if there is an error while
1721 : * doing the database operations we won't be able to rollback altered
1722 : * slot.
1723 : */
1724 372 : if (update_failover || update_two_phase || check_pub_rdt)
1725 : {
1726 : bool must_use_password;
1727 : char *err;
1728 : WalReceiverConn *wrconn;
1729 :
1730 : /* Load the library providing us libpq calls. */
1731 18 : load_file("libpqwalreceiver", false);
1732 :
1733 : /*
1734 : * Try to connect to the publisher, using the new connection string if
1735 : * available.
1736 : */
1737 18 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1738 18 : wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
1739 : true, true, must_use_password, sub->name,
1740 : &err);
1741 18 : if (!wrconn)
1742 0 : ereport(ERROR,
1743 : (errcode(ERRCODE_CONNECTION_FAILURE),
1744 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1745 : sub->name, err)));
1746 :
1747 18 : PG_TRY();
1748 : {
1749 18 : if (retain_dead_tuples)
1750 10 : check_pub_dead_tuple_retention(wrconn);
1751 :
1752 18 : check_publications_origin(wrconn, sub->publications, false,
1753 : retain_dead_tuples, origin, NULL, 0,
1754 : sub->name);
1755 :
1756 18 : if (update_failover || update_two_phase)
1757 8 : walrcv_alter_slot(wrconn, sub->slotname,
1758 : update_failover ? &opts.failover : NULL,
1759 : update_two_phase ? &opts.twophase : NULL);
1760 : }
1761 0 : PG_FINALLY();
1762 : {
1763 18 : walrcv_disconnect(wrconn);
1764 : }
1765 18 : PG_END_TRY();
1766 : }
1767 :
1768 372 : table_close(rel, RowExclusiveLock);
1769 :
1770 372 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1771 :
1772 372 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1773 :
1774 : /* Wake up related replication workers to handle this change quickly. */
1775 372 : LogicalRepWorkersWakeupAtCommit(subid);
1776 :
1777 372 : return myself;
1778 : }
1779 :
1780 : /*
1781 : * Drop a subscription
1782 : */
1783 : void
1784 238 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
1785 : {
1786 : Relation rel;
1787 : ObjectAddress myself;
1788 : HeapTuple tup;
1789 : Oid subid;
1790 : Oid subowner;
1791 : Datum datum;
1792 : bool isnull;
1793 : char *subname;
1794 : char *conninfo;
1795 : char *slotname;
1796 : List *subworkers;
1797 : ListCell *lc;
1798 : char originname[NAMEDATALEN];
1799 238 : char *err = NULL;
1800 : WalReceiverConn *wrconn;
1801 : Form_pg_subscription form;
1802 : List *rstates;
1803 : bool must_use_password;
1804 :
1805 : /*
1806 : * Lock pg_subscription with AccessExclusiveLock to ensure that the
1807 : * launcher doesn't restart new worker during dropping the subscription
1808 : */
1809 238 : rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
1810 :
1811 238 : tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
1812 238 : CStringGetDatum(stmt->subname));
1813 :
1814 238 : if (!HeapTupleIsValid(tup))
1815 : {
1816 12 : table_close(rel, NoLock);
1817 :
1818 12 : if (!stmt->missing_ok)
1819 6 : ereport(ERROR,
1820 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1821 : errmsg("subscription \"%s\" does not exist",
1822 : stmt->subname)));
1823 : else
1824 6 : ereport(NOTICE,
1825 : (errmsg("subscription \"%s\" does not exist, skipping",
1826 : stmt->subname)));
1827 :
1828 84 : return;
1829 : }
1830 :
1831 226 : form = (Form_pg_subscription) GETSTRUCT(tup);
1832 226 : subid = form->oid;
1833 226 : subowner = form->subowner;
1834 226 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
1835 :
1836 : /* must be owner */
1837 226 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1838 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1839 0 : stmt->subname);
1840 :
1841 : /* DROP hook for the subscription being removed */
1842 226 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
1843 :
1844 : /*
1845 : * Lock the subscription so nobody else can do anything with it (including
1846 : * the replication workers).
1847 : */
1848 226 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1849 :
1850 : /* Get subname */
1851 226 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1852 : Anum_pg_subscription_subname);
1853 226 : subname = pstrdup(NameStr(*DatumGetName(datum)));
1854 :
1855 : /* Get conninfo */
1856 226 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1857 : Anum_pg_subscription_subconninfo);
1858 226 : conninfo = TextDatumGetCString(datum);
1859 :
1860 : /* Get slotname */
1861 226 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1862 : Anum_pg_subscription_subslotname, &isnull);
1863 226 : if (!isnull)
1864 146 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
1865 : else
1866 80 : slotname = NULL;
1867 :
1868 : /*
1869 : * Since dropping a replication slot is not transactional, the replication
1870 : * slot stays dropped even if the transaction rolls back. So we cannot
1871 : * run DROP SUBSCRIPTION inside a transaction block if dropping the
1872 : * replication slot. Also, in this case, we report a message for dropping
1873 : * the subscription to the cumulative stats system.
1874 : *
1875 : * XXX The command name should really be something like "DROP SUBSCRIPTION
1876 : * of a subscription that is associated with a replication slot", but we
1877 : * don't have the proper facilities for that.
1878 : */
1879 226 : if (slotname)
1880 146 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1881 :
1882 220 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1883 220 : EventTriggerSQLDropAddObject(&myself, true, true);
1884 :
1885 : /* Remove the tuple from catalog. */
1886 220 : CatalogTupleDelete(rel, &tup->t_self);
1887 :
1888 220 : ReleaseSysCache(tup);
1889 :
1890 : /*
1891 : * Stop all the subscription workers immediately.
1892 : *
1893 : * This is necessary if we are dropping the replication slot, so that the
1894 : * slot becomes accessible.
1895 : *
1896 : * It is also necessary if the subscription is disabled and was disabled
1897 : * in the same transaction. Then the workers haven't seen the disabling
1898 : * yet and will still be running, leading to hangs later when we want to
1899 : * drop the replication origin. If the subscription was disabled before
1900 : * this transaction, then there shouldn't be any workers left, so this
1901 : * won't make a difference.
1902 : *
1903 : * New workers won't be started because we hold an exclusive lock on the
1904 : * subscription till the end of the transaction.
1905 : */
1906 220 : subworkers = logicalrep_workers_find(subid, false, true);
1907 364 : foreach(lc, subworkers)
1908 : {
1909 144 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
1910 :
1911 144 : logicalrep_worker_stop(w->subid, w->relid);
1912 : }
1913 220 : list_free(subworkers);
1914 :
1915 : /*
1916 : * Remove the no-longer-useful entry in the launcher's table of apply
1917 : * worker start times.
1918 : *
1919 : * If this transaction rolls back, the launcher might restart a failed
1920 : * apply worker before wal_retrieve_retry_interval milliseconds have
1921 : * elapsed, but that's pretty harmless.
1922 : */
1923 220 : ApplyLauncherForgetWorkerStartTime(subid);
1924 :
1925 : /*
1926 : * Cleanup of tablesync replication origins.
1927 : *
1928 : * Any READY-state relations would already have dealt with clean-ups.
1929 : *
1930 : * Note that the state can't change because we have already stopped both
1931 : * the apply and tablesync workers and they can't restart because of
1932 : * exclusive lock on the subscription.
1933 : */
1934 220 : rstates = GetSubscriptionRelations(subid, true);
1935 232 : foreach(lc, rstates)
1936 : {
1937 12 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
1938 12 : Oid relid = rstate->relid;
1939 :
1940 : /* Only cleanup resources of tablesync workers */
1941 12 : if (!OidIsValid(relid))
1942 0 : continue;
1943 :
1944 : /*
1945 : * Drop the tablesync's origin tracking if exists.
1946 : *
1947 : * It is possible that the origin is not yet created for tablesync
1948 : * worker so passing missing_ok = true. This can happen for the states
1949 : * before SUBREL_STATE_FINISHEDCOPY.
1950 : */
1951 12 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
1952 : sizeof(originname));
1953 12 : replorigin_drop_by_name(originname, true, false);
1954 : }
1955 :
1956 : /* Clean up dependencies */
1957 220 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
1958 :
1959 : /* Remove any associated relation synchronization states. */
1960 220 : RemoveSubscriptionRel(subid, InvalidOid);
1961 :
1962 : /* Remove the origin tracking if exists. */
1963 220 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
1964 220 : replorigin_drop_by_name(originname, true, false);
1965 :
1966 : /*
1967 : * Tell the cumulative stats system that the subscription is getting
1968 : * dropped.
1969 : */
1970 220 : pgstat_drop_subscription(subid);
1971 :
1972 : /*
1973 : * If there is no slot associated with the subscription, we can finish
1974 : * here.
1975 : */
1976 220 : if (!slotname && rstates == NIL)
1977 : {
1978 78 : table_close(rel, NoLock);
1979 78 : return;
1980 : }
1981 :
1982 : /*
1983 : * Try to acquire the connection necessary for dropping slots.
1984 : *
1985 : * Note: If the slotname is NONE/NULL then we allow the command to finish
1986 : * and users need to manually cleanup the apply and tablesync worker slots
1987 : * later.
1988 : *
1989 : * This has to be at the end because otherwise if there is an error while
1990 : * doing the database operations we won't be able to rollback dropped
1991 : * slot.
1992 : */
1993 142 : load_file("libpqwalreceiver", false);
1994 :
1995 142 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
1996 : subname, &err);
1997 142 : if (wrconn == NULL)
1998 : {
1999 0 : if (!slotname)
2000 : {
2001 : /* be tidy */
2002 0 : list_free(rstates);
2003 0 : table_close(rel, NoLock);
2004 0 : return;
2005 : }
2006 : else
2007 : {
2008 0 : ReportSlotConnectionError(rstates, subid, slotname, err);
2009 : }
2010 : }
2011 :
2012 142 : PG_TRY();
2013 : {
2014 154 : foreach(lc, rstates)
2015 : {
2016 12 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2017 12 : Oid relid = rstate->relid;
2018 :
2019 : /* Only cleanup resources of tablesync workers */
2020 12 : if (!OidIsValid(relid))
2021 0 : continue;
2022 :
2023 : /*
2024 : * Drop the tablesync slots associated with removed tables.
2025 : *
2026 : * For SYNCDONE/READY states, the tablesync slot is known to have
2027 : * already been dropped by the tablesync worker.
2028 : *
2029 : * For other states, there is no certainty, maybe the slot does
2030 : * not exist yet. Also, if we fail after removing some of the
2031 : * slots, next time, it will again try to drop already dropped
2032 : * slots and fail. For these reasons, we allow missing_ok = true
2033 : * for the drop.
2034 : */
2035 12 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2036 : {
2037 10 : char syncslotname[NAMEDATALEN] = {0};
2038 :
2039 10 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2040 : sizeof(syncslotname));
2041 10 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
2042 : }
2043 : }
2044 :
2045 142 : list_free(rstates);
2046 :
2047 : /*
2048 : * If there is a slot associated with the subscription, then drop the
2049 : * replication slot at the publisher.
2050 : */
2051 142 : if (slotname)
2052 140 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2053 : }
2054 0 : PG_FINALLY();
2055 : {
2056 142 : walrcv_disconnect(wrconn);
2057 : }
2058 142 : PG_END_TRY();
2059 :
2060 142 : table_close(rel, NoLock);
2061 : }
2062 :
2063 : /*
2064 : * Drop the replication slot at the publisher node using the replication
2065 : * connection.
2066 : *
2067 : * missing_ok - if true then only issue a LOG message if the slot doesn't
2068 : * exist.
2069 : */
2070 : void
2071 530 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2072 : {
2073 : StringInfoData cmd;
2074 :
2075 : Assert(wrconn);
2076 :
2077 530 : load_file("libpqwalreceiver", false);
2078 :
2079 530 : initStringInfo(&cmd);
2080 530 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2081 :
2082 530 : PG_TRY();
2083 : {
2084 : WalRcvExecResult *res;
2085 :
2086 530 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2087 :
2088 530 : if (res->status == WALRCV_OK_COMMAND)
2089 : {
2090 : /* NOTICE. Success. */
2091 526 : ereport(NOTICE,
2092 : (errmsg("dropped replication slot \"%s\" on publisher",
2093 : slotname)));
2094 : }
2095 4 : else if (res->status == WALRCV_ERROR &&
2096 4 : missing_ok &&
2097 4 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
2098 : {
2099 : /* LOG. Error, but missing_ok = true. */
2100 4 : ereport(LOG,
2101 : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2102 : slotname, res->err)));
2103 : }
2104 : else
2105 : {
2106 : /* ERROR. */
2107 0 : ereport(ERROR,
2108 : (errcode(ERRCODE_CONNECTION_FAILURE),
2109 : errmsg("could not drop replication slot \"%s\" on publisher: %s",
2110 : slotname, res->err)));
2111 : }
2112 :
2113 530 : walrcv_clear_result(res);
2114 : }
2115 0 : PG_FINALLY();
2116 : {
2117 530 : pfree(cmd.data);
2118 : }
2119 530 : PG_END_TRY();
2120 530 : }
2121 :
2122 : /*
2123 : * Internal workhorse for changing a subscription owner
2124 : */
2125 : static void
2126 18 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2127 : {
2128 : Form_pg_subscription form;
2129 : AclResult aclresult;
2130 :
2131 18 : form = (Form_pg_subscription) GETSTRUCT(tup);
2132 :
2133 18 : if (form->subowner == newOwnerId)
2134 4 : return;
2135 :
2136 14 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
2137 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
2138 0 : NameStr(form->subname));
2139 :
2140 : /*
2141 : * Don't allow non-superuser modification of a subscription with
2142 : * password_required=false.
2143 : */
2144 14 : if (!form->subpasswordrequired && !superuser())
2145 0 : ereport(ERROR,
2146 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2147 : errmsg("password_required=false is superuser-only"),
2148 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2149 :
2150 : /* Must be able to become new owner */
2151 14 : check_can_set_role(GetUserId(), newOwnerId);
2152 :
2153 : /*
2154 : * current owner must have CREATE on database
2155 : *
2156 : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2157 : * other object types behave differently (e.g. you can't give a table to a
2158 : * user who lacks CREATE privileges on a schema).
2159 : */
2160 8 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
2161 : GetUserId(), ACL_CREATE);
2162 8 : if (aclresult != ACLCHECK_OK)
2163 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2164 0 : get_database_name(MyDatabaseId));
2165 :
2166 8 : form->subowner = newOwnerId;
2167 8 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2168 :
2169 : /* Update owner dependency reference */
2170 8 : changeDependencyOnOwner(SubscriptionRelationId,
2171 : form->oid,
2172 : newOwnerId);
2173 :
2174 8 : InvokeObjectPostAlterHook(SubscriptionRelationId,
2175 : form->oid, 0);
2176 :
2177 : /* Wake up related background processes to handle this change quickly. */
2178 8 : ApplyLauncherWakeupAtCommit();
2179 8 : LogicalRepWorkersWakeupAtCommit(form->oid);
2180 : }
2181 :
2182 : /*
2183 : * Change subscription owner -- by name
2184 : */
2185 : ObjectAddress
2186 18 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
2187 : {
2188 : Oid subid;
2189 : HeapTuple tup;
2190 : Relation rel;
2191 : ObjectAddress address;
2192 : Form_pg_subscription form;
2193 :
2194 18 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2195 :
2196 18 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
2197 : CStringGetDatum(name));
2198 :
2199 18 : if (!HeapTupleIsValid(tup))
2200 0 : ereport(ERROR,
2201 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2202 : errmsg("subscription \"%s\" does not exist", name)));
2203 :
2204 18 : form = (Form_pg_subscription) GETSTRUCT(tup);
2205 18 : subid = form->oid;
2206 :
2207 18 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2208 :
2209 12 : ObjectAddressSet(address, SubscriptionRelationId, subid);
2210 :
2211 12 : heap_freetuple(tup);
2212 :
2213 12 : table_close(rel, RowExclusiveLock);
2214 :
2215 12 : return address;
2216 : }
2217 :
2218 : /*
2219 : * Change subscription owner -- by OID
2220 : */
2221 : void
2222 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
2223 : {
2224 : HeapTuple tup;
2225 : Relation rel;
2226 :
2227 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2228 :
2229 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
2230 :
2231 0 : if (!HeapTupleIsValid(tup))
2232 0 : ereport(ERROR,
2233 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2234 : errmsg("subscription with OID %u does not exist", subid)));
2235 :
2236 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2237 :
2238 0 : heap_freetuple(tup);
2239 :
2240 0 : table_close(rel, RowExclusiveLock);
2241 0 : }
2242 :
2243 : /*
2244 : * Check and log a warning if the publisher has subscribed to the same table,
2245 : * its partition ancestors (if it's a partition), or its partition children (if
2246 : * it's a partitioned table), from some other publishers. This check is
2247 : * required in the following scenarios:
2248 : *
2249 : * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
2250 : * with "copy_data = true" and "origin = none":
2251 : * - Warn the user that data with an origin might have been copied.
2252 : * - This check is skipped for tables already added, as incremental sync via
2253 : * WAL allows origin tracking. The list of such tables is in
2254 : * subrel_local_oids.
2255 : *
2256 : * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
2257 : * with "retain_dead_tuples = true" and "origin = any", and for ALTER
2258 : * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or
2259 : * when the publisher's status changes (e.g., due to a connection string
2260 : * update):
2261 : * - Warn the user that only conflict detection info for local changes on
2262 : * the publisher is retained. Data from other origins may lack sufficient
2263 : * details for reliable conflict detection.
2264 : * - See comments atop worker.c for more details.
2265 : */
2266 : static void
2267 318 : check_publications_origin(WalReceiverConn *wrconn, List *publications,
2268 : bool copydata, bool retain_dead_tuples,
2269 : char *origin, Oid *subrel_local_oids,
2270 : int subrel_count, char *subname)
2271 : {
2272 : WalRcvExecResult *res;
2273 : StringInfoData cmd;
2274 : TupleTableSlot *slot;
2275 318 : Oid tableRow[1] = {TEXTOID};
2276 318 : List *publist = NIL;
2277 : int i;
2278 : bool check_rdt;
2279 : bool check_table_sync;
2280 636 : bool origin_none = origin &&
2281 318 : pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
2282 :
2283 : /*
2284 : * Enable retain_dead_tuples checks only when origin is set to 'any',
2285 : * since with origin='none' only local changes are replicated to the
2286 : * subscriber.
2287 : */
2288 318 : check_rdt = retain_dead_tuples && !origin_none;
2289 :
2290 : /*
2291 : * Enable table synchronization checks only when origin is 'none', to
2292 : * ensure that data from other origins is not inadvertently copied.
2293 : */
2294 318 : check_table_sync = copydata && origin_none;
2295 :
2296 : /* retain_dead_tuples and table sync checks occur separately */
2297 : Assert(!(check_rdt && check_table_sync));
2298 :
2299 : /* Return if no checks are required */
2300 318 : if (!check_rdt && !check_table_sync)
2301 292 : return;
2302 :
2303 26 : initStringInfo(&cmd);
2304 26 : appendStringInfoString(&cmd,
2305 : "SELECT DISTINCT P.pubname AS pubname\n"
2306 : "FROM pg_publication P,\n"
2307 : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2308 : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2309 : " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2310 : " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2311 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2312 : "WHERE C.oid = GPT.relid AND P.pubname IN (");
2313 26 : GetPublicationsStr(publications, &cmd, true);
2314 26 : appendStringInfoString(&cmd, ")\n");
2315 :
2316 : /*
2317 : * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
2318 : * the list of relation oids that are already present on the subscriber.
2319 : * This check should be skipped for these tables if checking for table
2320 : * sync scenario. However, when handling the retain_dead_tuples scenario,
2321 : * ensure all tables are checked, as some existing tables may now include
2322 : * changes from other origins due to newly created subscriptions on the
2323 : * publisher.
2324 : */
2325 26 : if (check_table_sync)
2326 : {
2327 24 : for (i = 0; i < subrel_count; i++)
2328 : {
2329 6 : Oid relid = subrel_local_oids[i];
2330 6 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
2331 6 : char *tablename = get_rel_name(relid);
2332 :
2333 6 : appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2334 : schemaname, tablename);
2335 : }
2336 : }
2337 :
2338 26 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2339 26 : pfree(cmd.data);
2340 :
2341 26 : if (res->status != WALRCV_OK_TUPLES)
2342 0 : ereport(ERROR,
2343 : (errcode(ERRCODE_CONNECTION_FAILURE),
2344 : errmsg("could not receive list of replicated tables from the publisher: %s",
2345 : res->err)));
2346 :
2347 : /* Process tables. */
2348 26 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2349 36 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2350 : {
2351 : char *pubname;
2352 : bool isnull;
2353 :
2354 10 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2355 : Assert(!isnull);
2356 :
2357 10 : ExecClearTuple(slot);
2358 10 : publist = list_append_unique(publist, makeString(pubname));
2359 : }
2360 :
2361 : /*
2362 : * Log a warning if the publisher has subscribed to the same table from
2363 : * some other publisher. We cannot know the origin of data during the
2364 : * initial sync. Data origins can be found only from the WAL by looking at
2365 : * the origin id.
2366 : *
2367 : * XXX: For simplicity, we don't check whether the table has any data or
2368 : * not. If the table doesn't have any data then we don't need to
2369 : * distinguish between data having origin and data not having origin so we
2370 : * can avoid logging a warning for table sync scenario.
2371 : */
2372 26 : if (publist)
2373 : {
2374 10 : StringInfo pubnames = makeStringInfo();
2375 10 : StringInfo err_msg = makeStringInfo();
2376 10 : StringInfo err_hint = makeStringInfo();
2377 :
2378 : /* Prepare the list of publication(s) for warning message. */
2379 10 : GetPublicationsStr(publist, pubnames, false);
2380 :
2381 10 : if (check_table_sync)
2382 : {
2383 8 : appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
2384 : subname);
2385 8 : appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
2386 : }
2387 : else
2388 : {
2389 2 : appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
2390 : subname);
2391 2 : appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
2392 : }
2393 :
2394 10 : ereport(WARNING,
2395 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2396 : errmsg_internal("%s", err_msg->data),
2397 : errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2398 : "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2399 : list_length(publist), pubnames->data),
2400 : errhint_internal("%s", err_hint->data));
2401 : }
2402 :
2403 26 : ExecDropSingleTupleTableSlot(slot);
2404 :
2405 26 : walrcv_clear_result(res);
2406 : }
2407 :
2408 : /*
2409 : * Determine whether the retain_dead_tuples can be enabled based on the
2410 : * publisher's status.
2411 : *
2412 : * This option is disallowed if the publisher is running a version earlier
2413 : * than the PG19, or if the publisher is in recovery (i.e., it is a standby
2414 : * server).
2415 : *
2416 : * See comments atop worker.c for a detailed explanation.
2417 : */
2418 : static void
2419 16 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
2420 : {
2421 : WalRcvExecResult *res;
2422 16 : Oid RecoveryRow[1] = {BOOLOID};
2423 : TupleTableSlot *slot;
2424 : bool isnull;
2425 : bool remote_in_recovery;
2426 :
2427 16 : if (walrcv_server_version(wrconn) < 19000)
2428 0 : ereport(ERROR,
2429 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2430 : errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
2431 :
2432 16 : res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
2433 :
2434 16 : if (res->status != WALRCV_OK_TUPLES)
2435 0 : ereport(ERROR,
2436 : (errcode(ERRCODE_CONNECTION_FAILURE),
2437 : errmsg("could not obtain recovery progress from the publisher: %s",
2438 : res->err)));
2439 :
2440 16 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2441 16 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2442 0 : elog(ERROR, "failed to fetch tuple for the recovery progress");
2443 :
2444 16 : remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
2445 :
2446 16 : if (remote_in_recovery)
2447 0 : ereport(ERROR,
2448 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2449 : errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
2450 :
2451 16 : ExecDropSingleTupleTableSlot(slot);
2452 :
2453 16 : walrcv_clear_result(res);
2454 16 : }
2455 :
2456 : /*
2457 : * Check if the subscriber's configuration is adequate to enable the
2458 : * retain_dead_tuples option.
2459 : *
2460 : * Issue an ERROR if the wal_level does not support the use of replication
2461 : * slots when check_guc is set to true.
2462 : *
2463 : * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
2464 : * set to true. This is only to highlight the importance of enabling
2465 : * track_commit_timestamp instead of catching all the misconfigurations, as
2466 : * this setting can be adjusted after subscription creation. Without it, the
2467 : * apply worker will simply skip conflict detection.
2468 : *
2469 : * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an
2470 : * ERROR since users can only modify retain_dead_tuples for disabled
2471 : * subscriptions. And as long as the subscription is enabled promptly, it will
2472 : * not pose issues.
2473 : */
2474 : void
2475 18 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
2476 : int elevel_for_sub_disabled)
2477 : {
2478 : Assert(elevel_for_sub_disabled == NOTICE ||
2479 : elevel_for_sub_disabled == WARNING);
2480 :
2481 18 : if (check_guc && wal_level < WAL_LEVEL_REPLICA)
2482 0 : ereport(ERROR,
2483 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2484 : errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
2485 : errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
2486 :
2487 18 : if (check_guc && !track_commit_timestamp)
2488 14 : ereport(WARNING,
2489 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2490 : errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
2491 : errhint("Consider setting \"%s\" to true.",
2492 : "track_commit_timestamp"));
2493 :
2494 18 : if (sub_disabled)
2495 8 : ereport(elevel_for_sub_disabled,
2496 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2497 : errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
2498 : (elevel_for_sub_disabled > NOTICE)
2499 : ? errhint("Consider setting %s to false.",
2500 : "retain_dead_tuples") : 0);
2501 18 : }
2502 :
2503 : /*
2504 : * Get the list of tables which belong to specified publications on the
2505 : * publisher connection.
2506 : *
2507 : * Note that we don't support the case where the column list is different for
2508 : * the same table in different publications to avoid sending unwanted column
2509 : * information for some of the rows. This can happen when both the column
2510 : * list and row filter are specified for different publications.
2511 : */
2512 : static List *
2513 300 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
2514 : {
2515 : WalRcvExecResult *res;
2516 : StringInfoData cmd;
2517 : TupleTableSlot *slot;
2518 300 : Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
2519 300 : List *tablelist = NIL;
2520 300 : int server_version = walrcv_server_version(wrconn);
2521 300 : bool check_columnlist = (server_version >= 150000);
2522 300 : StringInfo pub_names = makeStringInfo();
2523 :
2524 300 : initStringInfo(&cmd);
2525 :
2526 : /* Build the pub_names comma-separated string. */
2527 300 : GetPublicationsStr(publications, pub_names, true);
2528 :
2529 : /* Get the list of tables from the publisher. */
2530 300 : if (server_version >= 160000)
2531 : {
2532 300 : tableRow[2] = INT2VECTOROID;
2533 :
2534 : /*
2535 : * From version 16, we allowed passing multiple publications to the
2536 : * function pg_get_publication_tables. This helped to filter out the
2537 : * partition table whose ancestor is also published in this
2538 : * publication array.
2539 : *
2540 : * Join pg_get_publication_tables with pg_publication to exclude
2541 : * non-existing publications.
2542 : *
2543 : * Note that attrs are always stored in sorted order so we don't need
2544 : * to worry if different publications have specified them in a
2545 : * different order. See pub_collist_validate.
2546 : */
2547 300 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
2548 : " FROM pg_class c\n"
2549 : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2550 : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2551 : " FROM pg_publication\n"
2552 : " WHERE pubname IN ( %s )) AS gpt\n"
2553 : " ON gpt.relid = c.oid\n",
2554 : pub_names->data);
2555 : }
2556 : else
2557 : {
2558 0 : tableRow[2] = NAMEARRAYOID;
2559 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
2560 :
2561 : /* Get column lists for each relation if the publisher supports it */
2562 0 : if (check_columnlist)
2563 0 : appendStringInfoString(&cmd, ", t.attnames\n");
2564 :
2565 0 : appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2566 : " WHERE t.pubname IN ( %s )",
2567 : pub_names->data);
2568 : }
2569 :
2570 300 : destroyStringInfo(pub_names);
2571 :
2572 300 : res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
2573 300 : pfree(cmd.data);
2574 :
2575 300 : if (res->status != WALRCV_OK_TUPLES)
2576 0 : ereport(ERROR,
2577 : (errcode(ERRCODE_CONNECTION_FAILURE),
2578 : errmsg("could not receive list of replicated tables from the publisher: %s",
2579 : res->err)));
2580 :
2581 : /* Process tables. */
2582 300 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2583 838 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2584 : {
2585 : char *nspname;
2586 : char *relname;
2587 : bool isnull;
2588 : RangeVar *rv;
2589 :
2590 540 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2591 : Assert(!isnull);
2592 540 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2593 : Assert(!isnull);
2594 :
2595 540 : rv = makeRangeVar(nspname, relname, -1);
2596 :
2597 540 : if (check_columnlist && list_member(tablelist, rv))
2598 2 : ereport(ERROR,
2599 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2600 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2601 : nspname, relname));
2602 : else
2603 538 : tablelist = lappend(tablelist, rv);
2604 :
2605 538 : ExecClearTuple(slot);
2606 : }
2607 298 : ExecDropSingleTupleTableSlot(slot);
2608 :
2609 298 : walrcv_clear_result(res);
2610 :
2611 298 : return tablelist;
2612 : }
2613 :
2614 : /*
2615 : * This is to report the connection failure while dropping replication slots.
2616 : * Here, we report the WARNING for all tablesync slots so that user can drop
2617 : * them manually, if required.
2618 : */
2619 : static void
2620 0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
2621 : {
2622 : ListCell *lc;
2623 :
2624 0 : foreach(lc, rstates)
2625 : {
2626 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2627 0 : Oid relid = rstate->relid;
2628 :
2629 : /* Only cleanup resources of tablesync workers */
2630 0 : if (!OidIsValid(relid))
2631 0 : continue;
2632 :
2633 : /*
2634 : * Caller needs to ensure that relstate doesn't change underneath us.
2635 : * See DropSubscription where we get the relstates.
2636 : */
2637 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2638 : {
2639 0 : char syncslotname[NAMEDATALEN] = {0};
2640 :
2641 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2642 : sizeof(syncslotname));
2643 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
2644 : syncslotname);
2645 : }
2646 : }
2647 :
2648 0 : ereport(ERROR,
2649 : (errcode(ERRCODE_CONNECTION_FAILURE),
2650 : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2651 : slotname, err),
2652 : /* translator: %s is an SQL ALTER command */
2653 : errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
2654 : "ALTER SUBSCRIPTION ... DISABLE",
2655 : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2656 : }
2657 :
2658 : /*
2659 : * Check for duplicates in the given list of publications and error out if
2660 : * found one. Add publications to datums as text datums, if datums is not
2661 : * NULL.
2662 : */
2663 : static void
2664 438 : check_duplicates_in_publist(List *publist, Datum *datums)
2665 : {
2666 : ListCell *cell;
2667 438 : int j = 0;
2668 :
2669 1000 : foreach(cell, publist)
2670 : {
2671 580 : char *name = strVal(lfirst(cell));
2672 : ListCell *pcell;
2673 :
2674 878 : foreach(pcell, publist)
2675 : {
2676 878 : char *pname = strVal(lfirst(pcell));
2677 :
2678 878 : if (pcell == cell)
2679 562 : break;
2680 :
2681 316 : if (strcmp(name, pname) == 0)
2682 18 : ereport(ERROR,
2683 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2684 : errmsg("publication name \"%s\" used more than once",
2685 : pname)));
2686 : }
2687 :
2688 562 : if (datums)
2689 476 : datums[j++] = CStringGetTextDatum(name);
2690 : }
2691 420 : }
2692 :
2693 : /*
2694 : * Merge current subscription's publications and user-specified publications
2695 : * from ADD/DROP PUBLICATIONS.
2696 : *
2697 : * If addpub is true, we will add the list of publications into oldpublist.
2698 : * Otherwise, we will delete the list of publications from oldpublist. The
2699 : * returned list is a copy, oldpublist itself is not changed.
2700 : *
2701 : * subname is the subscription name, for error messages.
2702 : */
2703 : static List *
2704 54 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
2705 : {
2706 : ListCell *lc;
2707 :
2708 54 : oldpublist = list_copy(oldpublist);
2709 :
2710 54 : check_duplicates_in_publist(newpublist, NULL);
2711 :
2712 92 : foreach(lc, newpublist)
2713 : {
2714 68 : char *name = strVal(lfirst(lc));
2715 : ListCell *lc2;
2716 68 : bool found = false;
2717 :
2718 134 : foreach(lc2, oldpublist)
2719 : {
2720 110 : char *pubname = strVal(lfirst(lc2));
2721 :
2722 110 : if (strcmp(name, pubname) == 0)
2723 : {
2724 44 : found = true;
2725 44 : if (addpub)
2726 12 : ereport(ERROR,
2727 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2728 : errmsg("publication \"%s\" is already in subscription \"%s\"",
2729 : name, subname)));
2730 : else
2731 32 : oldpublist = foreach_delete_current(oldpublist, lc2);
2732 :
2733 32 : break;
2734 : }
2735 : }
2736 :
2737 56 : if (addpub && !found)
2738 18 : oldpublist = lappend(oldpublist, makeString(name));
2739 38 : else if (!addpub && !found)
2740 6 : ereport(ERROR,
2741 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2742 : errmsg("publication \"%s\" is not in subscription \"%s\"",
2743 : name, subname)));
2744 : }
2745 :
2746 : /*
2747 : * XXX Probably no strong reason for this, but for now it's to make ALTER
2748 : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
2749 : */
2750 24 : if (!oldpublist)
2751 6 : ereport(ERROR,
2752 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2753 : errmsg("cannot drop all the publications from a subscription")));
2754 :
2755 18 : return oldpublist;
2756 : }
2757 :
2758 : /*
2759 : * Extract the streaming mode value from a DefElem. This is like
2760 : * defGetBoolean() but also accepts the special value of "parallel".
2761 : */
2762 : char
2763 828 : defGetStreamingMode(DefElem *def)
2764 : {
2765 : /*
2766 : * If no parameter value given, assume "true" is meant.
2767 : */
2768 828 : if (!def->arg)
2769 0 : return LOGICALREP_STREAM_ON;
2770 :
2771 : /*
2772 : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
2773 : */
2774 828 : switch (nodeTag(def->arg))
2775 : {
2776 0 : case T_Integer:
2777 0 : switch (intVal(def->arg))
2778 : {
2779 0 : case 0:
2780 0 : return LOGICALREP_STREAM_OFF;
2781 0 : case 1:
2782 0 : return LOGICALREP_STREAM_ON;
2783 0 : default:
2784 : /* otherwise, error out below */
2785 0 : break;
2786 : }
2787 0 : break;
2788 828 : default:
2789 : {
2790 828 : char *sval = defGetString(def);
2791 :
2792 : /*
2793 : * The set of strings accepted here should match up with the
2794 : * grammar's opt_boolean_or_string production.
2795 : */
2796 1650 : if (pg_strcasecmp(sval, "false") == 0 ||
2797 822 : pg_strcasecmp(sval, "off") == 0)
2798 12 : return LOGICALREP_STREAM_OFF;
2799 1614 : if (pg_strcasecmp(sval, "true") == 0 ||
2800 798 : pg_strcasecmp(sval, "on") == 0)
2801 90 : return LOGICALREP_STREAM_ON;
2802 726 : if (pg_strcasecmp(sval, "parallel") == 0)
2803 720 : return LOGICALREP_STREAM_PARALLEL;
2804 : }
2805 6 : break;
2806 : }
2807 :
2808 6 : ereport(ERROR,
2809 : (errcode(ERRCODE_SYNTAX_ERROR),
2810 : errmsg("%s requires a Boolean value or \"parallel\"",
2811 : def->defname)));
2812 : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
2813 : }
|