Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * subscriptioncmds.c
4 : * subscription catalog manipulation functions
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/subscriptioncmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/htup_details.h"
19 : #include "access/table.h"
20 : #include "access/twophase.h"
21 : #include "access/xact.h"
22 : #include "catalog/catalog.h"
23 : #include "catalog/dependency.h"
24 : #include "catalog/indexing.h"
25 : #include "catalog/namespace.h"
26 : #include "catalog/objectaccess.h"
27 : #include "catalog/objectaddress.h"
28 : #include "catalog/pg_authid_d.h"
29 : #include "catalog/pg_database_d.h"
30 : #include "catalog/pg_subscription.h"
31 : #include "catalog/pg_subscription_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/defrem.h"
34 : #include "commands/event_trigger.h"
35 : #include "commands/subscriptioncmds.h"
36 : #include "executor/executor.h"
37 : #include "miscadmin.h"
38 : #include "nodes/makefuncs.h"
39 : #include "pgstat.h"
40 : #include "replication/logicallauncher.h"
41 : #include "replication/logicalworker.h"
42 : #include "replication/origin.h"
43 : #include "replication/slot.h"
44 : #include "replication/walreceiver.h"
45 : #include "replication/walsender.h"
46 : #include "replication/worker_internal.h"
47 : #include "storage/lmgr.h"
48 : #include "utils/acl.h"
49 : #include "utils/builtins.h"
50 : #include "utils/guc.h"
51 : #include "utils/lsyscache.h"
52 : #include "utils/memutils.h"
53 : #include "utils/pg_lsn.h"
54 : #include "utils/syscache.h"
55 :
56 : /*
57 : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
58 : * command.
59 : */
60 : #define SUBOPT_CONNECT 0x00000001
61 : #define SUBOPT_ENABLED 0x00000002
62 : #define SUBOPT_CREATE_SLOT 0x00000004
63 : #define SUBOPT_SLOT_NAME 0x00000008
64 : #define SUBOPT_COPY_DATA 0x00000010
65 : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
66 : #define SUBOPT_REFRESH 0x00000040
67 : #define SUBOPT_BINARY 0x00000080
68 : #define SUBOPT_STREAMING 0x00000100
69 : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
70 : #define SUBOPT_DISABLE_ON_ERR 0x00000400
71 : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
72 : #define SUBOPT_RUN_AS_OWNER 0x00001000
73 : #define SUBOPT_FAILOVER 0x00002000
74 : #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
75 : #define SUBOPT_LSN 0x00008000
76 : #define SUBOPT_ORIGIN 0x00010000
77 :
78 : /* check if the 'val' has 'bits' set */
79 : #define IsSet(val, bits) (((val) & (bits)) == (bits))
80 :
81 : /*
82 : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
83 : * SUBSCRIPTION command options and the parsed/default values of each of them.
84 : */
85 : typedef struct SubOpts
86 : {
87 : bits32 specified_opts;
88 : char *slot_name;
89 : char *synchronous_commit;
90 : bool connect;
91 : bool enabled;
92 : bool create_slot;
93 : bool copy_data;
94 : bool refresh;
95 : bool binary;
96 : char streaming;
97 : bool twophase;
98 : bool disableonerr;
99 : bool passwordrequired;
100 : bool runasowner;
101 : bool failover;
102 : bool retaindeadtuples;
103 : char *origin;
104 : XLogRecPtr lsn;
105 : } SubOpts;
106 :
107 : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
108 : static void check_publications_origin(WalReceiverConn *wrconn,
109 : List *publications, bool copydata,
110 : bool retain_dead_tuples, char *origin,
111 : Oid *subrel_local_oids, int subrel_count,
112 : char *subname);
113 : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
114 : static void check_duplicates_in_publist(List *publist, Datum *datums);
115 : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
116 : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
117 : static void CheckAlterSubOption(Subscription *sub, const char *option,
118 : bool slot_needs_update, bool isTopLevel);
119 :
120 :
121 : /*
122 : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
123 : *
124 : * Since not all options can be specified in both commands, this function
125 : * will report an error if mutually exclusive options are specified.
126 : */
127 : static void
128 926 : parse_subscription_options(ParseState *pstate, List *stmt_options,
129 : bits32 supported_opts, SubOpts *opts)
130 : {
131 : ListCell *lc;
132 :
133 : /* Start out with cleared opts. */
134 926 : memset(opts, 0, sizeof(SubOpts));
135 :
136 : /* caller must expect some option */
137 : Assert(supported_opts != 0);
138 :
139 : /* If connect option is supported, these others also need to be. */
140 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
141 : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
142 : SUBOPT_COPY_DATA));
143 :
144 : /* Set default values for the supported options. */
145 926 : if (IsSet(supported_opts, SUBOPT_CONNECT))
146 468 : opts->connect = true;
147 926 : if (IsSet(supported_opts, SUBOPT_ENABLED))
148 564 : opts->enabled = true;
149 926 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
150 468 : opts->create_slot = true;
151 926 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
152 606 : opts->copy_data = true;
153 926 : if (IsSet(supported_opts, SUBOPT_REFRESH))
154 86 : opts->refresh = true;
155 926 : if (IsSet(supported_opts, SUBOPT_BINARY))
156 668 : opts->binary = false;
157 926 : if (IsSet(supported_opts, SUBOPT_STREAMING))
158 668 : opts->streaming = LOGICALREP_STREAM_PARALLEL;
159 926 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
160 668 : opts->twophase = false;
161 926 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
162 668 : opts->disableonerr = false;
163 926 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
164 668 : opts->passwordrequired = true;
165 926 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
166 668 : opts->runasowner = false;
167 926 : if (IsSet(supported_opts, SUBOPT_FAILOVER))
168 668 : opts->failover = false;
169 926 : if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
170 668 : opts->retaindeadtuples = false;
171 926 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
172 668 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
173 :
174 : /* Parse options */
175 1830 : foreach(lc, stmt_options)
176 : {
177 964 : DefElem *defel = (DefElem *) lfirst(lc);
178 :
179 964 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
180 568 : strcmp(defel->defname, "connect") == 0)
181 : {
182 178 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
183 0 : errorConflictingDefElem(defel, pstate);
184 :
185 178 : opts->specified_opts |= SUBOPT_CONNECT;
186 178 : opts->connect = defGetBoolean(defel);
187 : }
188 786 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
189 486 : strcmp(defel->defname, "enabled") == 0)
190 : {
191 134 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
192 0 : errorConflictingDefElem(defel, pstate);
193 :
194 134 : opts->specified_opts |= SUBOPT_ENABLED;
195 134 : opts->enabled = defGetBoolean(defel);
196 : }
197 652 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
198 352 : strcmp(defel->defname, "create_slot") == 0)
199 : {
200 40 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
201 0 : errorConflictingDefElem(defel, pstate);
202 :
203 40 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
204 40 : opts->create_slot = defGetBoolean(defel);
205 : }
206 612 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
207 516 : strcmp(defel->defname, "slot_name") == 0)
208 : {
209 150 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
210 0 : errorConflictingDefElem(defel, pstate);
211 :
212 150 : opts->specified_opts |= SUBOPT_SLOT_NAME;
213 150 : opts->slot_name = defGetString(defel);
214 :
215 : /* Setting slot_name = NONE is treated as no slot name. */
216 150 : if (strcmp(opts->slot_name, "none") == 0)
217 116 : opts->slot_name = NULL;
218 : else
219 34 : ReplicationSlotValidateName(opts->slot_name, false, ERROR);
220 : }
221 462 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
222 306 : strcmp(defel->defname, "copy_data") == 0)
223 : {
224 54 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
225 0 : errorConflictingDefElem(defel, pstate);
226 :
227 54 : opts->specified_opts |= SUBOPT_COPY_DATA;
228 54 : opts->copy_data = defGetBoolean(defel);
229 : }
230 408 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
231 318 : strcmp(defel->defname, "synchronous_commit") == 0)
232 : {
233 12 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
234 0 : errorConflictingDefElem(defel, pstate);
235 :
236 12 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
237 12 : opts->synchronous_commit = defGetString(defel);
238 :
239 : /* Test if the given value is valid for synchronous_commit GUC. */
240 12 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
241 : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
242 : false, 0, false);
243 : }
244 396 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
245 66 : strcmp(defel->defname, "refresh") == 0)
246 : {
247 66 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
248 0 : errorConflictingDefElem(defel, pstate);
249 :
250 66 : opts->specified_opts |= SUBOPT_REFRESH;
251 66 : opts->refresh = defGetBoolean(defel);
252 : }
253 330 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
254 306 : strcmp(defel->defname, "binary") == 0)
255 : {
256 32 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
257 0 : errorConflictingDefElem(defel, pstate);
258 :
259 32 : opts->specified_opts |= SUBOPT_BINARY;
260 32 : opts->binary = defGetBoolean(defel);
261 : }
262 298 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
263 274 : strcmp(defel->defname, "streaming") == 0)
264 : {
265 74 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
266 0 : errorConflictingDefElem(defel, pstate);
267 :
268 74 : opts->specified_opts |= SUBOPT_STREAMING;
269 74 : opts->streaming = defGetStreamingMode(defel);
270 : }
271 224 : else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
272 200 : strcmp(defel->defname, "two_phase") == 0)
273 : {
274 42 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
275 0 : errorConflictingDefElem(defel, pstate);
276 :
277 42 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
278 42 : opts->twophase = defGetBoolean(defel);
279 : }
280 182 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
281 158 : strcmp(defel->defname, "disable_on_error") == 0)
282 : {
283 20 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
284 0 : errorConflictingDefElem(defel, pstate);
285 :
286 20 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
287 20 : opts->disableonerr = defGetBoolean(defel);
288 : }
289 162 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
290 138 : strcmp(defel->defname, "password_required") == 0)
291 : {
292 24 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
293 0 : errorConflictingDefElem(defel, pstate);
294 :
295 24 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
296 24 : opts->passwordrequired = defGetBoolean(defel);
297 : }
298 138 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
299 114 : strcmp(defel->defname, "run_as_owner") == 0)
300 : {
301 18 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
302 0 : errorConflictingDefElem(defel, pstate);
303 :
304 18 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
305 18 : opts->runasowner = defGetBoolean(defel);
306 : }
307 120 : else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
308 96 : strcmp(defel->defname, "failover") == 0)
309 : {
310 24 : if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
311 0 : errorConflictingDefElem(defel, pstate);
312 :
313 24 : opts->specified_opts |= SUBOPT_FAILOVER;
314 24 : opts->failover = defGetBoolean(defel);
315 : }
316 96 : else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
317 72 : strcmp(defel->defname, "retain_dead_tuples") == 0)
318 : {
319 24 : if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
320 0 : errorConflictingDefElem(defel, pstate);
321 :
322 24 : opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
323 24 : opts->retaindeadtuples = defGetBoolean(defel);
324 : }
325 72 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
326 48 : strcmp(defel->defname, "origin") == 0)
327 : {
328 42 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
329 0 : errorConflictingDefElem(defel, pstate);
330 :
331 42 : opts->specified_opts |= SUBOPT_ORIGIN;
332 42 : pfree(opts->origin);
333 :
334 : /*
335 : * Even though the "origin" parameter allows only "none" and "any"
336 : * values, it is implemented as a string type so that the
337 : * parameter can be extended in future versions to support
338 : * filtering using origin names specified by the user.
339 : */
340 42 : opts->origin = defGetString(defel);
341 :
342 58 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
343 16 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
344 6 : ereport(ERROR,
345 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
346 : errmsg("unrecognized origin value: \"%s\"", opts->origin));
347 : }
348 30 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
349 24 : strcmp(defel->defname, "lsn") == 0)
350 18 : {
351 24 : char *lsn_str = defGetString(defel);
352 : XLogRecPtr lsn;
353 :
354 24 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
355 0 : errorConflictingDefElem(defel, pstate);
356 :
357 : /* Setting lsn = NONE is treated as resetting LSN */
358 24 : if (strcmp(lsn_str, "none") == 0)
359 6 : lsn = InvalidXLogRecPtr;
360 : else
361 : {
362 : /* Parse the argument as LSN */
363 18 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
364 : CStringGetDatum(lsn_str)));
365 :
366 18 : if (XLogRecPtrIsInvalid(lsn))
367 6 : ereport(ERROR,
368 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
369 : errmsg("invalid WAL location (LSN): %s", lsn_str)));
370 : }
371 :
372 18 : opts->specified_opts |= SUBOPT_LSN;
373 18 : opts->lsn = lsn;
374 : }
375 : else
376 6 : ereport(ERROR,
377 : (errcode(ERRCODE_SYNTAX_ERROR),
378 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
379 : }
380 :
381 : /*
382 : * We've been explicitly asked to not connect, that requires some
383 : * additional processing.
384 : */
385 866 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
386 : {
387 : /* Check for incompatible options from the user. */
388 142 : if (opts->enabled &&
389 142 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
390 6 : ereport(ERROR,
391 : (errcode(ERRCODE_SYNTAX_ERROR),
392 : /*- translator: both %s are strings of the form "option = value" */
393 : errmsg("%s and %s are mutually exclusive options",
394 : "connect = false", "enabled = true")));
395 :
396 136 : if (opts->create_slot &&
397 130 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
398 6 : ereport(ERROR,
399 : (errcode(ERRCODE_SYNTAX_ERROR),
400 : errmsg("%s and %s are mutually exclusive options",
401 : "connect = false", "create_slot = true")));
402 :
403 130 : if (opts->copy_data &&
404 124 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
405 6 : ereport(ERROR,
406 : (errcode(ERRCODE_SYNTAX_ERROR),
407 : errmsg("%s and %s are mutually exclusive options",
408 : "connect = false", "copy_data = true")));
409 :
410 : /* Change the defaults of other options. */
411 124 : opts->enabled = false;
412 124 : opts->create_slot = false;
413 124 : opts->copy_data = false;
414 : }
415 :
416 : /*
417 : * Do additional checking for disallowed combination when slot_name = NONE
418 : * was used.
419 : */
420 848 : if (!opts->slot_name &&
421 820 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
422 : {
423 110 : if (opts->enabled)
424 : {
425 18 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
426 6 : ereport(ERROR,
427 : (errcode(ERRCODE_SYNTAX_ERROR),
428 : /*- translator: both %s are strings of the form "option = value" */
429 : errmsg("%s and %s are mutually exclusive options",
430 : "slot_name = NONE", "enabled = true")));
431 : else
432 12 : ereport(ERROR,
433 : (errcode(ERRCODE_SYNTAX_ERROR),
434 : /*- translator: both %s are strings of the form "option = value" */
435 : errmsg("subscription with %s must also set %s",
436 : "slot_name = NONE", "enabled = false")));
437 : }
438 :
439 92 : if (opts->create_slot)
440 : {
441 12 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
442 6 : ereport(ERROR,
443 : (errcode(ERRCODE_SYNTAX_ERROR),
444 : /*- translator: both %s are strings of the form "option = value" */
445 : errmsg("%s and %s are mutually exclusive options",
446 : "slot_name = NONE", "create_slot = true")));
447 : else
448 6 : ereport(ERROR,
449 : (errcode(ERRCODE_SYNTAX_ERROR),
450 : /*- translator: both %s are strings of the form "option = value" */
451 : errmsg("subscription with %s must also set %s",
452 : "slot_name = NONE", "create_slot = false")));
453 : }
454 : }
455 818 : }
456 :
457 : /*
458 : * Check that the specified publications are present on the publisher.
459 : */
460 : static void
461 254 : check_publications(WalReceiverConn *wrconn, List *publications)
462 : {
463 : WalRcvExecResult *res;
464 : StringInfo cmd;
465 : TupleTableSlot *slot;
466 254 : List *publicationsCopy = NIL;
467 254 : Oid tableRow[1] = {TEXTOID};
468 :
469 254 : cmd = makeStringInfo();
470 254 : appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
471 : " pg_catalog.pg_publication t WHERE\n"
472 : " t.pubname IN (");
473 254 : GetPublicationsStr(publications, cmd, true);
474 254 : appendStringInfoChar(cmd, ')');
475 :
476 254 : res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
477 254 : destroyStringInfo(cmd);
478 :
479 254 : if (res->status != WALRCV_OK_TUPLES)
480 0 : ereport(ERROR,
481 : errmsg("could not receive list of publications from the publisher: %s",
482 : res->err));
483 :
484 254 : publicationsCopy = list_copy(publications);
485 :
486 : /* Process publication(s). */
487 254 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
488 560 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
489 : {
490 : char *pubname;
491 : bool isnull;
492 :
493 306 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
494 : Assert(!isnull);
495 :
496 : /* Delete the publication present in publisher from the list. */
497 306 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
498 306 : ExecClearTuple(slot);
499 : }
500 :
501 254 : ExecDropSingleTupleTableSlot(slot);
502 :
503 254 : walrcv_clear_result(res);
504 :
505 254 : if (list_length(publicationsCopy))
506 : {
507 : /* Prepare the list of non-existent publication(s) for error message. */
508 8 : StringInfo pubnames = makeStringInfo();
509 :
510 8 : GetPublicationsStr(publicationsCopy, pubnames, false);
511 8 : ereport(WARNING,
512 : errcode(ERRCODE_UNDEFINED_OBJECT),
513 : errmsg_plural("publication %s does not exist on the publisher",
514 : "publications %s do not exist on the publisher",
515 : list_length(publicationsCopy),
516 : pubnames->data));
517 : }
518 254 : }
519 :
520 : /*
521 : * Auxiliary function to build a text array out of a list of String nodes.
522 : */
523 : static Datum
524 386 : publicationListToArray(List *publist)
525 : {
526 : ArrayType *arr;
527 : Datum *datums;
528 : MemoryContext memcxt;
529 : MemoryContext oldcxt;
530 :
531 : /* Create memory context for temporary allocations. */
532 386 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
533 : "publicationListToArray to array",
534 : ALLOCSET_DEFAULT_SIZES);
535 386 : oldcxt = MemoryContextSwitchTo(memcxt);
536 :
537 386 : datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
538 :
539 386 : check_duplicates_in_publist(publist, datums);
540 :
541 380 : MemoryContextSwitchTo(oldcxt);
542 :
543 380 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
544 :
545 380 : MemoryContextDelete(memcxt);
546 :
547 380 : return PointerGetDatum(arr);
548 : }
549 :
550 : /*
551 : * Create new subscription.
552 : */
553 : ObjectAddress
554 468 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
555 : bool isTopLevel)
556 : {
557 : Relation rel;
558 : ObjectAddress myself;
559 : Oid subid;
560 : bool nulls[Natts_pg_subscription];
561 : Datum values[Natts_pg_subscription];
562 468 : Oid owner = GetUserId();
563 : HeapTuple tup;
564 : char *conninfo;
565 : char originname[NAMEDATALEN];
566 : List *publications;
567 : bits32 supported_opts;
568 468 : SubOpts opts = {0};
569 : AclResult aclresult;
570 :
571 : /*
572 : * Parse and check options.
573 : *
574 : * Connection and publication should not be specified here.
575 : */
576 468 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
577 : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
578 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
579 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
580 : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
581 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
582 : SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
583 468 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
584 :
585 : /*
586 : * Since creating a replication slot is not transactional, rolling back
587 : * the transaction leaves the created replication slot. So we cannot run
588 : * CREATE SUBSCRIPTION inside a transaction block if creating a
589 : * replication slot.
590 : */
591 384 : if (opts.create_slot)
592 250 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
593 :
594 : /*
595 : * We don't want to allow unprivileged users to be able to trigger
596 : * attempts to access arbitrary network destinations, so require the user
597 : * to have been specifically authorized to create subscriptions.
598 : */
599 378 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
600 6 : ereport(ERROR,
601 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
602 : errmsg("permission denied to create subscription"),
603 : errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
604 : "pg_create_subscription")));
605 :
606 : /*
607 : * Since a subscription is a database object, we also check for CREATE
608 : * permission on the database.
609 : */
610 372 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
611 : owner, ACL_CREATE);
612 372 : if (aclresult != ACLCHECK_OK)
613 12 : aclcheck_error(aclresult, OBJECT_DATABASE,
614 6 : get_database_name(MyDatabaseId));
615 :
616 : /*
617 : * Non-superusers are required to set a password for authentication, and
618 : * that password must be used by the target server, but the superuser can
619 : * exempt a subscription from this requirement.
620 : */
621 366 : if (!opts.passwordrequired && !superuser_arg(owner))
622 6 : ereport(ERROR,
623 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
624 : errmsg("password_required=false is superuser-only"),
625 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
626 :
627 : /*
628 : * If built with appropriate switch, whine when regression-testing
629 : * conventions for subscription names are violated.
630 : */
631 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
632 : if (strncmp(stmt->subname, "regress_", 8) != 0)
633 : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
634 : #endif
635 :
636 360 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
637 :
638 : /* Check if name is used */
639 360 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
640 : ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
641 360 : if (OidIsValid(subid))
642 : {
643 6 : ereport(ERROR,
644 : (errcode(ERRCODE_DUPLICATE_OBJECT),
645 : errmsg("subscription \"%s\" already exists",
646 : stmt->subname)));
647 : }
648 :
649 : /* Ensure that we can enable retain_dead_tuples */
650 354 : if (opts.retaindeadtuples)
651 8 : CheckSubDeadTupleRetention(true, !opts.enabled, WARNING);
652 :
653 354 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
654 312 : opts.slot_name == NULL)
655 312 : opts.slot_name = stmt->subname;
656 :
657 : /* The default for synchronous_commit of subscriptions is off. */
658 354 : if (opts.synchronous_commit == NULL)
659 354 : opts.synchronous_commit = "off";
660 :
661 354 : conninfo = stmt->conninfo;
662 354 : publications = stmt->publication;
663 :
664 : /* Load the library providing us libpq calls. */
665 354 : load_file("libpqwalreceiver", false);
666 :
667 : /* Check the connection info string. */
668 354 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
669 :
670 : /* Everything ok, form a new tuple. */
671 336 : memset(values, 0, sizeof(values));
672 336 : memset(nulls, false, sizeof(nulls));
673 :
674 336 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
675 : Anum_pg_subscription_oid);
676 336 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
677 336 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
678 336 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
679 336 : values[Anum_pg_subscription_subname - 1] =
680 336 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
681 336 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
682 336 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
683 336 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
684 336 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
685 336 : values[Anum_pg_subscription_subtwophasestate - 1] =
686 336 : CharGetDatum(opts.twophase ?
687 : LOGICALREP_TWOPHASE_STATE_PENDING :
688 : LOGICALREP_TWOPHASE_STATE_DISABLED);
689 336 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
690 336 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
691 336 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
692 336 : values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
693 336 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
694 336 : BoolGetDatum(opts.retaindeadtuples);
695 336 : values[Anum_pg_subscription_subconninfo - 1] =
696 336 : CStringGetTextDatum(conninfo);
697 336 : if (opts.slot_name)
698 316 : values[Anum_pg_subscription_subslotname - 1] =
699 316 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
700 : else
701 20 : nulls[Anum_pg_subscription_subslotname - 1] = true;
702 336 : values[Anum_pg_subscription_subsynccommit - 1] =
703 336 : CStringGetTextDatum(opts.synchronous_commit);
704 330 : values[Anum_pg_subscription_subpublications - 1] =
705 336 : publicationListToArray(publications);
706 330 : values[Anum_pg_subscription_suborigin - 1] =
707 330 : CStringGetTextDatum(opts.origin);
708 :
709 330 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
710 :
711 : /* Insert tuple into catalog. */
712 330 : CatalogTupleInsert(rel, tup);
713 330 : heap_freetuple(tup);
714 :
715 330 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
716 :
717 330 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
718 330 : replorigin_create(originname);
719 :
720 : /*
721 : * Connect to remote side to execute requested commands and fetch table
722 : * info.
723 : */
724 330 : if (opts.connect)
725 : {
726 : char *err;
727 : WalReceiverConn *wrconn;
728 : List *tables;
729 : ListCell *lc;
730 : char table_state;
731 : bool must_use_password;
732 :
733 : /* Try to connect to the publisher. */
734 242 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
735 242 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
736 : stmt->subname, &err);
737 242 : if (!wrconn)
738 6 : ereport(ERROR,
739 : (errcode(ERRCODE_CONNECTION_FAILURE),
740 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
741 : stmt->subname, err)));
742 :
743 236 : PG_TRY();
744 : {
745 236 : check_publications(wrconn, publications);
746 236 : check_publications_origin(wrconn, publications, opts.copy_data,
747 236 : opts.retaindeadtuples, opts.origin,
748 : NULL, 0, stmt->subname);
749 :
750 236 : if (opts.retaindeadtuples)
751 6 : check_pub_dead_tuple_retention(wrconn);
752 :
753 : /*
754 : * Set sync state based on if we were asked to do data copy or
755 : * not.
756 : */
757 236 : table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
758 :
759 : /*
760 : * Get the table list from publisher and build local table status
761 : * info.
762 : */
763 236 : tables = fetch_table_list(wrconn, publications);
764 592 : foreach(lc, tables)
765 : {
766 358 : RangeVar *rv = (RangeVar *) lfirst(lc);
767 : Oid relid;
768 :
769 358 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
770 :
771 : /* Check for supported relkind. */
772 358 : CheckSubscriptionRelkind(get_rel_relkind(relid),
773 358 : rv->schemaname, rv->relname);
774 :
775 358 : AddSubscriptionRelState(subid, relid, table_state,
776 : InvalidXLogRecPtr, true);
777 : }
778 :
779 : /*
780 : * If requested, create permanent slot for the subscription. We
781 : * won't use the initial snapshot for anything, so no need to
782 : * export it.
783 : */
784 234 : if (opts.create_slot)
785 : {
786 224 : bool twophase_enabled = false;
787 :
788 : Assert(opts.slot_name);
789 :
790 : /*
791 : * Even if two_phase is set, don't create the slot with
792 : * two-phase enabled. Will enable it once all the tables are
793 : * synced and ready. This avoids race-conditions like prepared
794 : * transactions being skipped due to changes not being applied
795 : * due to checks in should_apply_changes_for_rel() when
796 : * tablesync for the corresponding tables are in progress. See
797 : * comments atop worker.c.
798 : *
799 : * Note that if tables were specified but copy_data is false
800 : * then it is safe to enable two_phase up-front because those
801 : * tables are already initially in READY state. When the
802 : * subscription has no tables, we leave the twophase state as
803 : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
804 : * PUBLICATION to work.
805 : */
806 224 : if (opts.twophase && !opts.copy_data && tables != NIL)
807 2 : twophase_enabled = true;
808 :
809 224 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
810 : opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
811 :
812 224 : if (twophase_enabled)
813 2 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
814 :
815 224 : ereport(NOTICE,
816 : (errmsg("created replication slot \"%s\" on publisher",
817 : opts.slot_name)));
818 : }
819 : }
820 2 : PG_FINALLY();
821 : {
822 236 : walrcv_disconnect(wrconn);
823 : }
824 236 : PG_END_TRY();
825 : }
826 : else
827 88 : ereport(WARNING,
828 : (errmsg("subscription was created, but is not connected"),
829 : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
830 :
831 322 : table_close(rel, RowExclusiveLock);
832 :
833 322 : pgstat_create_subscription(subid);
834 :
835 322 : if (opts.enabled)
836 220 : ApplyLauncherWakeupAtCommit();
837 :
838 322 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
839 :
840 322 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
841 :
842 322 : return myself;
843 : }
844 :
845 : static void
846 66 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
847 : List *validate_publications)
848 : {
849 : char *err;
850 : List *pubrel_names;
851 : List *subrel_states;
852 : Oid *subrel_local_oids;
853 : Oid *pubrel_local_oids;
854 : ListCell *lc;
855 : int off;
856 : int remove_rel_len;
857 : int subrel_count;
858 66 : Relation rel = NULL;
859 : typedef struct SubRemoveRels
860 : {
861 : Oid relid;
862 : char state;
863 : } SubRemoveRels;
864 : SubRemoveRels *sub_remove_rels;
865 : WalReceiverConn *wrconn;
866 : bool must_use_password;
867 :
868 : /* Load the library providing us libpq calls. */
869 66 : load_file("libpqwalreceiver", false);
870 :
871 : /* Try to connect to the publisher. */
872 66 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
873 66 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
874 : sub->name, &err);
875 64 : if (!wrconn)
876 0 : ereport(ERROR,
877 : (errcode(ERRCODE_CONNECTION_FAILURE),
878 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
879 : sub->name, err)));
880 :
881 64 : PG_TRY();
882 : {
883 64 : if (validate_publications)
884 18 : check_publications(wrconn, validate_publications);
885 :
886 : /* Get the table list from publisher. */
887 64 : pubrel_names = fetch_table_list(wrconn, sub->publications);
888 :
889 : /* Get local table list. */
890 64 : subrel_states = GetSubscriptionRelations(sub->oid, false);
891 64 : subrel_count = list_length(subrel_states);
892 :
893 : /*
894 : * Build qsorted array of local table oids for faster lookup. This can
895 : * potentially contain all tables in the database so speed of lookup
896 : * is important.
897 : */
898 64 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
899 64 : off = 0;
900 238 : foreach(lc, subrel_states)
901 : {
902 174 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
903 :
904 174 : subrel_local_oids[off++] = relstate->relid;
905 : }
906 64 : qsort(subrel_local_oids, subrel_count,
907 : sizeof(Oid), oid_cmp);
908 :
909 64 : check_publications_origin(wrconn, sub->publications, copy_data,
910 64 : sub->retaindeadtuples, sub->origin,
911 : subrel_local_oids, subrel_count, sub->name);
912 :
913 : /*
914 : * Rels that we want to remove from subscription and drop any slots
915 : * and origins corresponding to them.
916 : */
917 64 : sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
918 :
919 : /*
920 : * Walk over the remote tables and try to match them to locally known
921 : * tables. If the table is not known locally create a new state for
922 : * it.
923 : *
924 : * Also builds array of local oids of remote tables for the next step.
925 : */
926 64 : off = 0;
927 64 : pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
928 :
929 242 : foreach(lc, pubrel_names)
930 : {
931 178 : RangeVar *rv = (RangeVar *) lfirst(lc);
932 : Oid relid;
933 :
934 178 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
935 :
936 : /* Check for supported relkind. */
937 178 : CheckSubscriptionRelkind(get_rel_relkind(relid),
938 178 : rv->schemaname, rv->relname);
939 :
940 178 : pubrel_local_oids[off++] = relid;
941 :
942 178 : if (!bsearch(&relid, subrel_local_oids,
943 : subrel_count, sizeof(Oid), oid_cmp))
944 : {
945 44 : AddSubscriptionRelState(sub->oid, relid,
946 : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
947 : InvalidXLogRecPtr, true);
948 44 : ereport(DEBUG1,
949 : (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
950 : rv->schemaname, rv->relname, sub->name)));
951 : }
952 : }
953 :
954 : /*
955 : * Next remove state for tables we should not care about anymore using
956 : * the data we collected above
957 : */
958 64 : qsort(pubrel_local_oids, list_length(pubrel_names),
959 : sizeof(Oid), oid_cmp);
960 :
961 64 : remove_rel_len = 0;
962 238 : for (off = 0; off < subrel_count; off++)
963 : {
964 174 : Oid relid = subrel_local_oids[off];
965 :
966 174 : if (!bsearch(&relid, pubrel_local_oids,
967 174 : list_length(pubrel_names), sizeof(Oid), oid_cmp))
968 : {
969 : char state;
970 : XLogRecPtr statelsn;
971 :
972 : /*
973 : * Lock pg_subscription_rel with AccessExclusiveLock to
974 : * prevent any race conditions with the apply worker
975 : * re-launching workers at the same time this code is trying
976 : * to remove those tables.
977 : *
978 : * Even if new worker for this particular rel is restarted it
979 : * won't be able to make any progress as we hold exclusive
980 : * lock on pg_subscription_rel till the transaction end. It
981 : * will simply exit as there is no corresponding rel entry.
982 : *
983 : * This locking also ensures that the state of rels won't
984 : * change till we are done with this refresh operation.
985 : */
986 40 : if (!rel)
987 16 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
988 :
989 : /* Last known rel state. */
990 40 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
991 :
992 40 : sub_remove_rels[remove_rel_len].relid = relid;
993 40 : sub_remove_rels[remove_rel_len++].state = state;
994 :
995 40 : RemoveSubscriptionRel(sub->oid, relid);
996 :
997 40 : logicalrep_worker_stop(sub->oid, relid);
998 :
999 : /*
1000 : * For READY state, we would have already dropped the
1001 : * tablesync origin.
1002 : */
1003 40 : if (state != SUBREL_STATE_READY)
1004 : {
1005 : char originname[NAMEDATALEN];
1006 :
1007 : /*
1008 : * Drop the tablesync's origin tracking if exists.
1009 : *
1010 : * It is possible that the origin is not yet created for
1011 : * tablesync worker, this can happen for the states before
1012 : * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
1013 : * apply worker can also concurrently try to drop the
1014 : * origin and by this time the origin might be already
1015 : * removed. For these reasons, passing missing_ok = true.
1016 : */
1017 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1018 : sizeof(originname));
1019 0 : replorigin_drop_by_name(originname, true, false);
1020 : }
1021 :
1022 40 : ereport(DEBUG1,
1023 : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1024 : get_namespace_name(get_rel_namespace(relid)),
1025 : get_rel_name(relid),
1026 : sub->name)));
1027 : }
1028 : }
1029 :
1030 : /*
1031 : * Drop the tablesync slots associated with removed tables. This has
1032 : * to be at the end because otherwise if there is an error while doing
1033 : * the database operations we won't be able to rollback dropped slots.
1034 : */
1035 104 : for (off = 0; off < remove_rel_len; off++)
1036 : {
1037 40 : if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
1038 0 : sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
1039 : {
1040 0 : char syncslotname[NAMEDATALEN] = {0};
1041 :
1042 : /*
1043 : * For READY/SYNCDONE states we know the tablesync slot has
1044 : * already been dropped by the tablesync worker.
1045 : *
1046 : * For other states, there is no certainty, maybe the slot
1047 : * does not exist yet. Also, if we fail after removing some of
1048 : * the slots, next time, it will again try to drop already
1049 : * dropped slots and fail. For these reasons, we allow
1050 : * missing_ok = true for the drop.
1051 : */
1052 0 : ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
1053 : syncslotname, sizeof(syncslotname));
1054 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1055 : }
1056 : }
1057 : }
1058 0 : PG_FINALLY();
1059 : {
1060 64 : walrcv_disconnect(wrconn);
1061 : }
1062 64 : PG_END_TRY();
1063 :
1064 64 : if (rel)
1065 16 : table_close(rel, NoLock);
1066 64 : }
1067 :
1068 : /*
1069 : * Common checks for altering failover, two_phase, and retain_dead_tuples
1070 : * options.
1071 : */
1072 : static void
1073 24 : CheckAlterSubOption(Subscription *sub, const char *option,
1074 : bool slot_needs_update, bool isTopLevel)
1075 : {
1076 : Assert(strcmp(option, "failover") == 0 ||
1077 : strcmp(option, "two_phase") == 0 ||
1078 : strcmp(option, "retain_dead_tuples") == 0);
1079 :
1080 : /*
1081 : * Altering the retain_dead_tuples option does not update the slot on the
1082 : * publisher.
1083 : */
1084 : Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1085 :
1086 : /*
1087 : * Do not allow changing the option if the subscription is enabled. This
1088 : * is because both failover and two_phase options of the slot on the
1089 : * publisher cannot be modified if the slot is currently acquired by the
1090 : * existing walsender.
1091 : *
1092 : * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1093 : * the publisher by the existing walsender, so we could have allowed that
1094 : * even when the subscription is enabled. But we kept this restriction for
1095 : * the sake of consistency and simplicity.
1096 : *
1097 : * Additionally, do not allow changing the retain_dead_tuples option when
1098 : * the subscription is enabled to prevent race conditions arising from the
1099 : * new option value being acknowledged asynchronously by the launcher and
1100 : * apply workers.
1101 : *
1102 : * Without the restriction, a race condition may arise when a user
1103 : * disables and immediately re-enables the retain_dead_tuples option. In
1104 : * this case, the launcher might drop the slot upon noticing the disabled
1105 : * action, while the apply worker may keep maintaining
1106 : * oldest_nonremovable_xid without noticing the option change. During this
1107 : * period, a transaction ID wraparound could falsely make this ID appear
1108 : * as if it originates from the future w.r.t the transaction ID stored in
1109 : * the slot maintained by launcher.
1110 : *
1111 : * Similarly, if the user enables retain_dead_tuples concurrently with the
1112 : * launcher starting the worker, the apply worker may start calculating
1113 : * oldest_nonremovable_xid before the launcher notices the enable action.
1114 : * Consequently, the launcher may update slot.xmin to a newer value than
1115 : * that maintained by the worker. In subsequent cycles, upon integrating
1116 : * the worker's oldest_nonremovable_xid, the launcher might detect a
1117 : * retreat in the calculated xmin, necessitating additional handling.
1118 : *
1119 : * XXX To address the above race conditions, we can define
1120 : * oldest_nonremovable_xid as FullTransactionID and adds the check to
1121 : * disallow retreating the conflict slot's xmin. For now, we kept the
1122 : * implementation simple by disallowing change to the retain_dead_tuples,
1123 : * but in the future we can change this after some more analysis.
1124 : *
1125 : * Note that we could restrict only the enabling of retain_dead_tuples to
1126 : * avoid the race conditions described above, but we maintain the
1127 : * restriction for both enable and disable operations for the sake of
1128 : * consistency.
1129 : */
1130 24 : if (sub->enabled)
1131 4 : ereport(ERROR,
1132 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1133 : errmsg("cannot set option \"%s\" for enabled subscription",
1134 : option)));
1135 :
1136 20 : if (slot_needs_update)
1137 : {
1138 : StringInfoData cmd;
1139 :
1140 : /*
1141 : * A valid slot must be associated with the subscription for us to
1142 : * modify any of the slot's properties.
1143 : */
1144 14 : if (!sub->slotname)
1145 0 : ereport(ERROR,
1146 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1147 : errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1148 : option)));
1149 :
1150 : /* The changed option of the slot can't be rolled back. */
1151 14 : initStringInfo(&cmd);
1152 14 : appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1153 :
1154 14 : PreventInTransactionBlock(isTopLevel, cmd.data);
1155 8 : pfree(cmd.data);
1156 : }
1157 14 : }
1158 :
1159 : /*
1160 : * Alter the existing subscription.
1161 : */
1162 : ObjectAddress
1163 490 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1164 : bool isTopLevel)
1165 : {
1166 : Relation rel;
1167 : ObjectAddress myself;
1168 : bool nulls[Natts_pg_subscription];
1169 : bool replaces[Natts_pg_subscription];
1170 : Datum values[Natts_pg_subscription];
1171 : HeapTuple tup;
1172 : Oid subid;
1173 490 : bool update_tuple = false;
1174 490 : bool update_failover = false;
1175 490 : bool update_two_phase = false;
1176 490 : bool check_pub_rdt = false;
1177 : bool retain_dead_tuples;
1178 : char *origin;
1179 : Subscription *sub;
1180 : Form_pg_subscription form;
1181 : bits32 supported_opts;
1182 490 : SubOpts opts = {0};
1183 :
1184 490 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1185 :
1186 : /* Fetch the existing tuple. */
1187 490 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
1188 : CStringGetDatum(stmt->subname));
1189 :
1190 490 : if (!HeapTupleIsValid(tup))
1191 6 : ereport(ERROR,
1192 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1193 : errmsg("subscription \"%s\" does not exist",
1194 : stmt->subname)));
1195 :
1196 484 : form = (Form_pg_subscription) GETSTRUCT(tup);
1197 484 : subid = form->oid;
1198 :
1199 : /* must be owner */
1200 484 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1201 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1202 0 : stmt->subname);
1203 :
1204 484 : sub = GetSubscription(subid, false);
1205 :
1206 484 : retain_dead_tuples = sub->retaindeadtuples;
1207 484 : origin = sub->origin;
1208 :
1209 : /*
1210 : * Don't allow non-superuser modification of a subscription with
1211 : * password_required=false.
1212 : */
1213 484 : if (!sub->passwordrequired && !superuser())
1214 0 : ereport(ERROR,
1215 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1216 : errmsg("password_required=false is superuser-only"),
1217 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1218 :
1219 : /* Lock the subscription so nobody else can do anything with it. */
1220 484 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1221 :
1222 : /* Form a new tuple. */
1223 484 : memset(values, 0, sizeof(values));
1224 484 : memset(nulls, false, sizeof(nulls));
1225 484 : memset(replaces, false, sizeof(replaces));
1226 :
1227 484 : switch (stmt->kind)
1228 : {
1229 200 : case ALTER_SUBSCRIPTION_OPTIONS:
1230 : {
1231 200 : supported_opts = (SUBOPT_SLOT_NAME |
1232 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1233 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
1234 : SUBOPT_DISABLE_ON_ERR |
1235 : SUBOPT_PASSWORD_REQUIRED |
1236 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1237 : SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
1238 :
1239 200 : parse_subscription_options(pstate, stmt->options,
1240 : supported_opts, &opts);
1241 :
1242 182 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1243 : {
1244 : /*
1245 : * The subscription must be disabled to allow slot_name as
1246 : * 'none', otherwise, the apply worker will repeatedly try
1247 : * to stream the data using that slot_name which neither
1248 : * exists on the publisher nor the user will be allowed to
1249 : * create it.
1250 : */
1251 66 : if (sub->enabled && !opts.slot_name)
1252 0 : ereport(ERROR,
1253 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1254 : errmsg("cannot set %s for enabled subscription",
1255 : "slot_name = NONE")));
1256 :
1257 66 : if (opts.slot_name)
1258 6 : values[Anum_pg_subscription_subslotname - 1] =
1259 6 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1260 : else
1261 60 : nulls[Anum_pg_subscription_subslotname - 1] = true;
1262 66 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1263 : }
1264 :
1265 182 : if (opts.synchronous_commit)
1266 : {
1267 6 : values[Anum_pg_subscription_subsynccommit - 1] =
1268 6 : CStringGetTextDatum(opts.synchronous_commit);
1269 6 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1270 : }
1271 :
1272 182 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1273 : {
1274 18 : values[Anum_pg_subscription_subbinary - 1] =
1275 18 : BoolGetDatum(opts.binary);
1276 18 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1277 : }
1278 :
1279 182 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1280 : {
1281 30 : values[Anum_pg_subscription_substream - 1] =
1282 30 : CharGetDatum(opts.streaming);
1283 30 : replaces[Anum_pg_subscription_substream - 1] = true;
1284 : }
1285 :
1286 182 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1287 : {
1288 : values[Anum_pg_subscription_subdisableonerr - 1]
1289 6 : = BoolGetDatum(opts.disableonerr);
1290 : replaces[Anum_pg_subscription_subdisableonerr - 1]
1291 6 : = true;
1292 : }
1293 :
1294 182 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1295 : {
1296 : /* Non-superuser may not disable password_required. */
1297 12 : if (!opts.passwordrequired && !superuser())
1298 0 : ereport(ERROR,
1299 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1300 : errmsg("password_required=false is superuser-only"),
1301 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1302 :
1303 : values[Anum_pg_subscription_subpasswordrequired - 1]
1304 12 : = BoolGetDatum(opts.passwordrequired);
1305 : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1306 12 : = true;
1307 : }
1308 :
1309 182 : if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1310 : {
1311 14 : values[Anum_pg_subscription_subrunasowner - 1] =
1312 14 : BoolGetDatum(opts.runasowner);
1313 14 : replaces[Anum_pg_subscription_subrunasowner - 1] = true;
1314 : }
1315 :
1316 182 : if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1317 : {
1318 : /*
1319 : * We need to update both the slot and the subscription
1320 : * for the two_phase option. We can enable the two_phase
1321 : * option for a slot only once the initial data
1322 : * synchronization is done. This is to avoid missing some
1323 : * data as explained in comments atop worker.c.
1324 : */
1325 6 : update_two_phase = !opts.twophase;
1326 :
1327 6 : CheckAlterSubOption(sub, "two_phase", update_two_phase,
1328 : isTopLevel);
1329 :
1330 : /*
1331 : * Modifying the two_phase slot option requires a slot
1332 : * lookup by slot name, so changing the slot name at the
1333 : * same time is not allowed.
1334 : */
1335 6 : if (update_two_phase &&
1336 2 : IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1337 0 : ereport(ERROR,
1338 : (errcode(ERRCODE_SYNTAX_ERROR),
1339 : errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1340 :
1341 : /*
1342 : * Note that workers may still survive even if the
1343 : * subscription has been disabled.
1344 : *
1345 : * Ensure workers have already been exited to avoid
1346 : * getting prepared transactions while we are disabling
1347 : * the two_phase option. Otherwise, the changes of an
1348 : * already prepared transaction can be replicated again
1349 : * along with its corresponding commit, leading to
1350 : * duplicate data or errors.
1351 : */
1352 6 : if (logicalrep_workers_find(subid, true, true))
1353 0 : ereport(ERROR,
1354 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1355 : errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1356 : errhint("Try again after some time.")));
1357 :
1358 : /*
1359 : * two_phase cannot be disabled if there are any
1360 : * uncommitted prepared transactions present otherwise it
1361 : * can lead to duplicate data or errors as explained in
1362 : * the comment above.
1363 : */
1364 6 : if (update_two_phase &&
1365 2 : sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
1366 2 : LookupGXactBySubid(subid))
1367 0 : ereport(ERROR,
1368 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1369 : errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1370 : errhint("Resolve these transactions and try again.")));
1371 :
1372 : /* Change system catalog accordingly */
1373 6 : values[Anum_pg_subscription_subtwophasestate - 1] =
1374 6 : CharGetDatum(opts.twophase ?
1375 : LOGICALREP_TWOPHASE_STATE_PENDING :
1376 : LOGICALREP_TWOPHASE_STATE_DISABLED);
1377 6 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1378 : }
1379 :
1380 182 : if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1381 : {
1382 : /*
1383 : * Similar to the two_phase case above, we need to update
1384 : * the failover option for both the slot and the
1385 : * subscription.
1386 : */
1387 14 : update_failover = true;
1388 :
1389 14 : CheckAlterSubOption(sub, "failover", update_failover,
1390 : isTopLevel);
1391 :
1392 6 : values[Anum_pg_subscription_subfailover - 1] =
1393 6 : BoolGetDatum(opts.failover);
1394 6 : replaces[Anum_pg_subscription_subfailover - 1] = true;
1395 : }
1396 :
1397 174 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1398 : {
1399 4 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
1400 4 : BoolGetDatum(opts.retaindeadtuples);
1401 4 : replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
1402 :
1403 4 : CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1404 :
1405 : /*
1406 : * Workers may continue running even after the
1407 : * subscription has been disabled.
1408 : *
1409 : * To prevent race conditions (as described in
1410 : * CheckAlterSubOption()), ensure that all worker
1411 : * processes have already exited before proceeding.
1412 : */
1413 2 : if (logicalrep_workers_find(subid, true, true))
1414 0 : ereport(ERROR,
1415 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1416 : errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1417 : errhint("Try again after some time.")));
1418 :
1419 : /*
1420 : * Remind the user that enabling subscription will prevent
1421 : * the accumulation of dead tuples.
1422 : */
1423 2 : if (opts.retaindeadtuples)
1424 2 : CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE);
1425 :
1426 : /*
1427 : * Notify the launcher to manage the replication slot for
1428 : * conflict detection. This ensures that replication slot
1429 : * is efficiently handled (created, updated, or dropped)
1430 : * in response to any configuration changes.
1431 : */
1432 2 : ApplyLauncherWakeupAtCommit();
1433 :
1434 2 : check_pub_rdt = opts.retaindeadtuples;
1435 2 : retain_dead_tuples = opts.retaindeadtuples;
1436 : }
1437 :
1438 172 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1439 : {
1440 10 : values[Anum_pg_subscription_suborigin - 1] =
1441 10 : CStringGetTextDatum(opts.origin);
1442 10 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1443 :
1444 : /*
1445 : * Check if changes from different origins may be received
1446 : * from the publisher when the origin is changed to ANY
1447 : * and retain_dead_tuples is enabled.
1448 : */
1449 14 : check_pub_rdt = retain_dead_tuples &&
1450 4 : pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
1451 :
1452 10 : origin = opts.origin;
1453 : }
1454 :
1455 172 : update_tuple = true;
1456 172 : break;
1457 : }
1458 :
1459 96 : case ALTER_SUBSCRIPTION_ENABLED:
1460 : {
1461 96 : parse_subscription_options(pstate, stmt->options,
1462 : SUBOPT_ENABLED, &opts);
1463 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1464 :
1465 96 : if (!sub->slotname && opts.enabled)
1466 6 : ereport(ERROR,
1467 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1468 : errmsg("cannot enable subscription that does not have a slot name")));
1469 :
1470 : /*
1471 : * Check track_commit_timestamp only when enabling the
1472 : * subscription in case it was disabled after creation. See
1473 : * comments atop CheckSubDeadTupleRetention() for details.
1474 : */
1475 90 : if (sub->retaindeadtuples)
1476 12 : CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1477 12 : WARNING);
1478 :
1479 90 : values[Anum_pg_subscription_subenabled - 1] =
1480 90 : BoolGetDatum(opts.enabled);
1481 90 : replaces[Anum_pg_subscription_subenabled - 1] = true;
1482 :
1483 90 : if (opts.enabled)
1484 50 : ApplyLauncherWakeupAtCommit();
1485 :
1486 90 : update_tuple = true;
1487 :
1488 : /*
1489 : * The subscription might be initially created with
1490 : * connect=false and retain_dead_tuples=true, meaning the
1491 : * remote server's status may not be checked. Ensure this
1492 : * check is conducted now.
1493 : */
1494 90 : check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1495 90 : break;
1496 : }
1497 :
1498 20 : case ALTER_SUBSCRIPTION_CONNECTION:
1499 : /* Load the library providing us libpq calls. */
1500 20 : load_file("libpqwalreceiver", false);
1501 : /* Check the connection info string. */
1502 20 : walrcv_check_conninfo(stmt->conninfo,
1503 : sub->passwordrequired && !sub->ownersuperuser);
1504 :
1505 14 : values[Anum_pg_subscription_subconninfo - 1] =
1506 14 : CStringGetTextDatum(stmt->conninfo);
1507 14 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1508 14 : update_tuple = true;
1509 :
1510 : /*
1511 : * Since the remote server configuration might have changed,
1512 : * perform a check to ensure it permits enabling
1513 : * retain_dead_tuples.
1514 : */
1515 14 : check_pub_rdt = sub->retaindeadtuples;
1516 14 : break;
1517 :
1518 32 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
1519 : {
1520 32 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1521 32 : parse_subscription_options(pstate, stmt->options,
1522 : supported_opts, &opts);
1523 :
1524 32 : values[Anum_pg_subscription_subpublications - 1] =
1525 32 : publicationListToArray(stmt->publication);
1526 32 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1527 :
1528 32 : update_tuple = true;
1529 :
1530 : /* Refresh if user asked us to. */
1531 32 : if (opts.refresh)
1532 : {
1533 26 : if (!sub->enabled)
1534 0 : ereport(ERROR,
1535 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1536 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1537 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1538 :
1539 : /*
1540 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1541 : * not allowed.
1542 : */
1543 26 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1544 0 : ereport(ERROR,
1545 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1546 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1547 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1548 :
1549 26 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1550 :
1551 : /* Make sure refresh sees the new list of publications. */
1552 14 : sub->publications = stmt->publication;
1553 :
1554 14 : AlterSubscription_refresh(sub, opts.copy_data,
1555 : stmt->publication);
1556 : }
1557 :
1558 20 : break;
1559 : }
1560 :
1561 54 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
1562 : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1563 : {
1564 : List *publist;
1565 54 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1566 :
1567 54 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1568 54 : parse_subscription_options(pstate, stmt->options,
1569 : supported_opts, &opts);
1570 :
1571 54 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1572 18 : values[Anum_pg_subscription_subpublications - 1] =
1573 18 : publicationListToArray(publist);
1574 18 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1575 :
1576 18 : update_tuple = true;
1577 :
1578 : /* Refresh if user asked us to. */
1579 18 : if (opts.refresh)
1580 : {
1581 : /* We only need to validate user specified publications. */
1582 6 : List *validate_publications = (isadd) ? stmt->publication : NULL;
1583 :
1584 6 : if (!sub->enabled)
1585 0 : ereport(ERROR,
1586 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1587 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1588 : /* translator: %s is an SQL ALTER command */
1589 : errhint("Use %s instead.",
1590 : isadd ?
1591 : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1592 : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1593 :
1594 : /*
1595 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1596 : * not allowed.
1597 : */
1598 6 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1599 0 : ereport(ERROR,
1600 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1601 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1602 : /* translator: %s is an SQL ALTER command */
1603 : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1604 : isadd ?
1605 : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1606 : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1607 :
1608 6 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1609 :
1610 : /* Refresh the new list of publications. */
1611 6 : sub->publications = publist;
1612 :
1613 6 : AlterSubscription_refresh(sub, opts.copy_data,
1614 : validate_publications);
1615 : }
1616 :
1617 18 : break;
1618 : }
1619 :
1620 58 : case ALTER_SUBSCRIPTION_REFRESH:
1621 : {
1622 58 : if (!sub->enabled)
1623 6 : ereport(ERROR,
1624 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1625 : errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1626 :
1627 52 : parse_subscription_options(pstate, stmt->options,
1628 : SUBOPT_COPY_DATA, &opts);
1629 :
1630 : /*
1631 : * The subscription option "two_phase" requires that
1632 : * replication has passed the initial table synchronization
1633 : * phase before the two_phase becomes properly enabled.
1634 : *
1635 : * But, having reached this two-phase commit "enabled" state
1636 : * we must not allow any subsequent table initialization to
1637 : * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
1638 : * when the user had requested two_phase = on mode.
1639 : *
1640 : * The exception to this restriction is when copy_data =
1641 : * false, because when copy_data is false the tablesync will
1642 : * start already in READY state and will exit directly without
1643 : * doing anything.
1644 : *
1645 : * For more details see comments atop worker.c.
1646 : */
1647 52 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1648 0 : ereport(ERROR,
1649 : (errcode(ERRCODE_SYNTAX_ERROR),
1650 : errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1651 : errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1652 :
1653 52 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
1654 :
1655 46 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
1656 :
1657 44 : break;
1658 : }
1659 :
1660 24 : case ALTER_SUBSCRIPTION_SKIP:
1661 : {
1662 24 : parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1663 :
1664 : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1665 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1666 :
1667 : /*
1668 : * If the user sets subskiplsn, we do a sanity check to make
1669 : * sure that the specified LSN is a probable value.
1670 : */
1671 18 : if (!XLogRecPtrIsInvalid(opts.lsn))
1672 : {
1673 : RepOriginId originid;
1674 : char originname[NAMEDATALEN];
1675 : XLogRecPtr remote_lsn;
1676 :
1677 12 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
1678 : originname, sizeof(originname));
1679 12 : originid = replorigin_by_name(originname, false);
1680 12 : remote_lsn = replorigin_get_progress(originid, false);
1681 :
1682 : /* Check the given LSN is at least a future LSN */
1683 12 : if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1684 0 : ereport(ERROR,
1685 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1686 : errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
1687 : LSN_FORMAT_ARGS(opts.lsn),
1688 : LSN_FORMAT_ARGS(remote_lsn))));
1689 : }
1690 :
1691 18 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1692 18 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1693 :
1694 18 : update_tuple = true;
1695 18 : break;
1696 : }
1697 :
1698 0 : default:
1699 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1700 : stmt->kind);
1701 : }
1702 :
1703 : /* Update the catalog if needed. */
1704 376 : if (update_tuple)
1705 : {
1706 332 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1707 : replaces);
1708 :
1709 332 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1710 :
1711 332 : heap_freetuple(tup);
1712 : }
1713 :
1714 : /*
1715 : * Try to acquire the connection necessary either for modifying the slot
1716 : * or for checking if the remote server permits enabling
1717 : * retain_dead_tuples.
1718 : *
1719 : * This has to be at the end because otherwise if there is an error while
1720 : * doing the database operations we won't be able to rollback altered
1721 : * slot.
1722 : */
1723 376 : if (update_failover || update_two_phase || check_pub_rdt)
1724 : {
1725 : bool must_use_password;
1726 : char *err;
1727 : WalReceiverConn *wrconn;
1728 :
1729 : /* Load the library providing us libpq calls. */
1730 20 : load_file("libpqwalreceiver", false);
1731 :
1732 : /*
1733 : * Try to connect to the publisher, using the new connection string if
1734 : * available.
1735 : */
1736 20 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1737 20 : wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
1738 : true, true, must_use_password, sub->name,
1739 : &err);
1740 20 : if (!wrconn)
1741 0 : ereport(ERROR,
1742 : (errcode(ERRCODE_CONNECTION_FAILURE),
1743 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1744 : sub->name, err)));
1745 :
1746 20 : PG_TRY();
1747 : {
1748 20 : if (retain_dead_tuples)
1749 12 : check_pub_dead_tuple_retention(wrconn);
1750 :
1751 20 : check_publications_origin(wrconn, sub->publications, false,
1752 : retain_dead_tuples, origin, NULL, 0,
1753 : sub->name);
1754 :
1755 20 : if (update_failover || update_two_phase)
1756 8 : walrcv_alter_slot(wrconn, sub->slotname,
1757 : update_failover ? &opts.failover : NULL,
1758 : update_two_phase ? &opts.twophase : NULL);
1759 : }
1760 0 : PG_FINALLY();
1761 : {
1762 20 : walrcv_disconnect(wrconn);
1763 : }
1764 20 : PG_END_TRY();
1765 : }
1766 :
1767 376 : table_close(rel, RowExclusiveLock);
1768 :
1769 376 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1770 :
1771 376 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1772 :
1773 : /* Wake up related replication workers to handle this change quickly. */
1774 376 : LogicalRepWorkersWakeupAtCommit(subid);
1775 :
1776 376 : return myself;
1777 : }
1778 :
1779 : /*
1780 : * Drop a subscription
1781 : */
1782 : void
1783 240 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
1784 : {
1785 : Relation rel;
1786 : ObjectAddress myself;
1787 : HeapTuple tup;
1788 : Oid subid;
1789 : Oid subowner;
1790 : Datum datum;
1791 : bool isnull;
1792 : char *subname;
1793 : char *conninfo;
1794 : char *slotname;
1795 : List *subworkers;
1796 : ListCell *lc;
1797 : char originname[NAMEDATALEN];
1798 240 : char *err = NULL;
1799 : WalReceiverConn *wrconn;
1800 : Form_pg_subscription form;
1801 : List *rstates;
1802 : bool must_use_password;
1803 :
1804 : /*
1805 : * The launcher may concurrently start a new worker for this subscription.
1806 : * During initialization, the worker checks for subscription validity and
1807 : * exits if the subscription has already been dropped. See
1808 : * InitializeLogRepWorker.
1809 : */
1810 240 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1811 :
1812 240 : tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
1813 240 : CStringGetDatum(stmt->subname));
1814 :
1815 240 : if (!HeapTupleIsValid(tup))
1816 : {
1817 12 : table_close(rel, NoLock);
1818 :
1819 12 : if (!stmt->missing_ok)
1820 6 : ereport(ERROR,
1821 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1822 : errmsg("subscription \"%s\" does not exist",
1823 : stmt->subname)));
1824 : else
1825 6 : ereport(NOTICE,
1826 : (errmsg("subscription \"%s\" does not exist, skipping",
1827 : stmt->subname)));
1828 :
1829 84 : return;
1830 : }
1831 :
1832 228 : form = (Form_pg_subscription) GETSTRUCT(tup);
1833 228 : subid = form->oid;
1834 228 : subowner = form->subowner;
1835 228 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
1836 :
1837 : /* must be owner */
1838 228 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1839 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1840 0 : stmt->subname);
1841 :
1842 : /* DROP hook for the subscription being removed */
1843 228 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
1844 :
1845 : /*
1846 : * Lock the subscription so nobody else can do anything with it (including
1847 : * the replication workers).
1848 : */
1849 228 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1850 :
1851 : /* Get subname */
1852 228 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1853 : Anum_pg_subscription_subname);
1854 228 : subname = pstrdup(NameStr(*DatumGetName(datum)));
1855 :
1856 : /* Get conninfo */
1857 228 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1858 : Anum_pg_subscription_subconninfo);
1859 228 : conninfo = TextDatumGetCString(datum);
1860 :
1861 : /* Get slotname */
1862 228 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1863 : Anum_pg_subscription_subslotname, &isnull);
1864 228 : if (!isnull)
1865 148 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
1866 : else
1867 80 : slotname = NULL;
1868 :
1869 : /*
1870 : * Since dropping a replication slot is not transactional, the replication
1871 : * slot stays dropped even if the transaction rolls back. So we cannot
1872 : * run DROP SUBSCRIPTION inside a transaction block if dropping the
1873 : * replication slot. Also, in this case, we report a message for dropping
1874 : * the subscription to the cumulative stats system.
1875 : *
1876 : * XXX The command name should really be something like "DROP SUBSCRIPTION
1877 : * of a subscription that is associated with a replication slot", but we
1878 : * don't have the proper facilities for that.
1879 : */
1880 228 : if (slotname)
1881 148 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1882 :
1883 222 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1884 222 : EventTriggerSQLDropAddObject(&myself, true, true);
1885 :
1886 : /* Remove the tuple from catalog. */
1887 222 : CatalogTupleDelete(rel, &tup->t_self);
1888 :
1889 222 : ReleaseSysCache(tup);
1890 :
1891 : /*
1892 : * Stop all the subscription workers immediately.
1893 : *
1894 : * This is necessary if we are dropping the replication slot, so that the
1895 : * slot becomes accessible.
1896 : *
1897 : * It is also necessary if the subscription is disabled and was disabled
1898 : * in the same transaction. Then the workers haven't seen the disabling
1899 : * yet and will still be running, leading to hangs later when we want to
1900 : * drop the replication origin. If the subscription was disabled before
1901 : * this transaction, then there shouldn't be any workers left, so this
1902 : * won't make a difference.
1903 : *
1904 : * New workers won't be started because we hold an exclusive lock on the
1905 : * subscription till the end of the transaction.
1906 : */
1907 222 : subworkers = logicalrep_workers_find(subid, false, true);
1908 368 : foreach(lc, subworkers)
1909 : {
1910 146 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
1911 :
1912 146 : logicalrep_worker_stop(w->subid, w->relid);
1913 : }
1914 222 : list_free(subworkers);
1915 :
1916 : /*
1917 : * Remove the no-longer-useful entry in the launcher's table of apply
1918 : * worker start times.
1919 : *
1920 : * If this transaction rolls back, the launcher might restart a failed
1921 : * apply worker before wal_retrieve_retry_interval milliseconds have
1922 : * elapsed, but that's pretty harmless.
1923 : */
1924 222 : ApplyLauncherForgetWorkerStartTime(subid);
1925 :
1926 : /*
1927 : * Cleanup of tablesync replication origins.
1928 : *
1929 : * Any READY-state relations would already have dealt with clean-ups.
1930 : *
1931 : * Note that the state can't change because we have already stopped both
1932 : * the apply and tablesync workers and they can't restart because of
1933 : * exclusive lock on the subscription.
1934 : */
1935 222 : rstates = GetSubscriptionRelations(subid, true);
1936 234 : foreach(lc, rstates)
1937 : {
1938 12 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
1939 12 : Oid relid = rstate->relid;
1940 :
1941 : /* Only cleanup resources of tablesync workers */
1942 12 : if (!OidIsValid(relid))
1943 0 : continue;
1944 :
1945 : /*
1946 : * Drop the tablesync's origin tracking if exists.
1947 : *
1948 : * It is possible that the origin is not yet created for tablesync
1949 : * worker so passing missing_ok = true. This can happen for the states
1950 : * before SUBREL_STATE_FINISHEDCOPY.
1951 : */
1952 12 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
1953 : sizeof(originname));
1954 12 : replorigin_drop_by_name(originname, true, false);
1955 : }
1956 :
1957 : /* Clean up dependencies */
1958 222 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
1959 :
1960 : /* Remove any associated relation synchronization states. */
1961 222 : RemoveSubscriptionRel(subid, InvalidOid);
1962 :
1963 : /* Remove the origin tracking if exists. */
1964 222 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
1965 222 : replorigin_drop_by_name(originname, true, false);
1966 :
1967 : /*
1968 : * Tell the cumulative stats system that the subscription is getting
1969 : * dropped.
1970 : */
1971 222 : pgstat_drop_subscription(subid);
1972 :
1973 : /*
1974 : * If there is no slot associated with the subscription, we can finish
1975 : * here.
1976 : */
1977 222 : if (!slotname && rstates == NIL)
1978 : {
1979 78 : table_close(rel, NoLock);
1980 78 : return;
1981 : }
1982 :
1983 : /*
1984 : * Try to acquire the connection necessary for dropping slots.
1985 : *
1986 : * Note: If the slotname is NONE/NULL then we allow the command to finish
1987 : * and users need to manually cleanup the apply and tablesync worker slots
1988 : * later.
1989 : *
1990 : * This has to be at the end because otherwise if there is an error while
1991 : * doing the database operations we won't be able to rollback dropped
1992 : * slot.
1993 : */
1994 144 : load_file("libpqwalreceiver", false);
1995 :
1996 144 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
1997 : subname, &err);
1998 144 : if (wrconn == NULL)
1999 : {
2000 0 : if (!slotname)
2001 : {
2002 : /* be tidy */
2003 0 : list_free(rstates);
2004 0 : table_close(rel, NoLock);
2005 0 : return;
2006 : }
2007 : else
2008 : {
2009 0 : ReportSlotConnectionError(rstates, subid, slotname, err);
2010 : }
2011 : }
2012 :
2013 144 : PG_TRY();
2014 : {
2015 156 : foreach(lc, rstates)
2016 : {
2017 12 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2018 12 : Oid relid = rstate->relid;
2019 :
2020 : /* Only cleanup resources of tablesync workers */
2021 12 : if (!OidIsValid(relid))
2022 0 : continue;
2023 :
2024 : /*
2025 : * Drop the tablesync slots associated with removed tables.
2026 : *
2027 : * For SYNCDONE/READY states, the tablesync slot is known to have
2028 : * already been dropped by the tablesync worker.
2029 : *
2030 : * For other states, there is no certainty, maybe the slot does
2031 : * not exist yet. Also, if we fail after removing some of the
2032 : * slots, next time, it will again try to drop already dropped
2033 : * slots and fail. For these reasons, we allow missing_ok = true
2034 : * for the drop.
2035 : */
2036 12 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2037 : {
2038 10 : char syncslotname[NAMEDATALEN] = {0};
2039 :
2040 10 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2041 : sizeof(syncslotname));
2042 10 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
2043 : }
2044 : }
2045 :
2046 144 : list_free(rstates);
2047 :
2048 : /*
2049 : * If there is a slot associated with the subscription, then drop the
2050 : * replication slot at the publisher.
2051 : */
2052 144 : if (slotname)
2053 142 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2054 : }
2055 2 : PG_FINALLY();
2056 : {
2057 144 : walrcv_disconnect(wrconn);
2058 : }
2059 144 : PG_END_TRY();
2060 :
2061 142 : table_close(rel, NoLock);
2062 : }
2063 :
2064 : /*
2065 : * Drop the replication slot at the publisher node using the replication
2066 : * connection.
2067 : *
2068 : * missing_ok - if true then only issue a LOG message if the slot doesn't
2069 : * exist.
2070 : */
2071 : void
2072 530 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2073 : {
2074 : StringInfoData cmd;
2075 :
2076 : Assert(wrconn);
2077 :
2078 530 : load_file("libpqwalreceiver", false);
2079 :
2080 530 : initStringInfo(&cmd);
2081 530 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2082 :
2083 530 : PG_TRY();
2084 : {
2085 : WalRcvExecResult *res;
2086 :
2087 530 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2088 :
2089 530 : if (res->status == WALRCV_OK_COMMAND)
2090 : {
2091 : /* NOTICE. Success. */
2092 524 : ereport(NOTICE,
2093 : (errmsg("dropped replication slot \"%s\" on publisher",
2094 : slotname)));
2095 : }
2096 6 : else if (res->status == WALRCV_ERROR &&
2097 4 : missing_ok &&
2098 4 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
2099 : {
2100 : /* LOG. Error, but missing_ok = true. */
2101 4 : ereport(LOG,
2102 : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2103 : slotname, res->err)));
2104 : }
2105 : else
2106 : {
2107 : /* ERROR. */
2108 2 : ereport(ERROR,
2109 : (errcode(ERRCODE_CONNECTION_FAILURE),
2110 : errmsg("could not drop replication slot \"%s\" on publisher: %s",
2111 : slotname, res->err)));
2112 : }
2113 :
2114 528 : walrcv_clear_result(res);
2115 : }
2116 2 : PG_FINALLY();
2117 : {
2118 530 : pfree(cmd.data);
2119 : }
2120 530 : PG_END_TRY();
2121 528 : }
2122 :
2123 : /*
2124 : * Internal workhorse for changing a subscription owner
2125 : */
2126 : static void
2127 18 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2128 : {
2129 : Form_pg_subscription form;
2130 : AclResult aclresult;
2131 :
2132 18 : form = (Form_pg_subscription) GETSTRUCT(tup);
2133 :
2134 18 : if (form->subowner == newOwnerId)
2135 4 : return;
2136 :
2137 14 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
2138 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
2139 0 : NameStr(form->subname));
2140 :
2141 : /*
2142 : * Don't allow non-superuser modification of a subscription with
2143 : * password_required=false.
2144 : */
2145 14 : if (!form->subpasswordrequired && !superuser())
2146 0 : ereport(ERROR,
2147 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2148 : errmsg("password_required=false is superuser-only"),
2149 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2150 :
2151 : /* Must be able to become new owner */
2152 14 : check_can_set_role(GetUserId(), newOwnerId);
2153 :
2154 : /*
2155 : * current owner must have CREATE on database
2156 : *
2157 : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2158 : * other object types behave differently (e.g. you can't give a table to a
2159 : * user who lacks CREATE privileges on a schema).
2160 : */
2161 8 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
2162 : GetUserId(), ACL_CREATE);
2163 8 : if (aclresult != ACLCHECK_OK)
2164 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2165 0 : get_database_name(MyDatabaseId));
2166 :
2167 8 : form->subowner = newOwnerId;
2168 8 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2169 :
2170 : /* Update owner dependency reference */
2171 8 : changeDependencyOnOwner(SubscriptionRelationId,
2172 : form->oid,
2173 : newOwnerId);
2174 :
2175 8 : InvokeObjectPostAlterHook(SubscriptionRelationId,
2176 : form->oid, 0);
2177 :
2178 : /* Wake up related background processes to handle this change quickly. */
2179 8 : ApplyLauncherWakeupAtCommit();
2180 8 : LogicalRepWorkersWakeupAtCommit(form->oid);
2181 : }
2182 :
2183 : /*
2184 : * Change subscription owner -- by name
2185 : */
2186 : ObjectAddress
2187 18 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
2188 : {
2189 : Oid subid;
2190 : HeapTuple tup;
2191 : Relation rel;
2192 : ObjectAddress address;
2193 : Form_pg_subscription form;
2194 :
2195 18 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2196 :
2197 18 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2198 : CStringGetDatum(name));
2199 :
2200 18 : if (!HeapTupleIsValid(tup))
2201 0 : ereport(ERROR,
2202 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2203 : errmsg("subscription \"%s\" does not exist", name)));
2204 :
2205 18 : form = (Form_pg_subscription) GETSTRUCT(tup);
2206 18 : subid = form->oid;
2207 :
2208 18 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2209 :
2210 12 : ObjectAddressSet(address, SubscriptionRelationId, subid);
2211 :
2212 12 : heap_freetuple(tup);
2213 :
2214 12 : table_close(rel, RowExclusiveLock);
2215 :
2216 12 : return address;
2217 : }
2218 :
2219 : /*
2220 : * Change subscription owner -- by OID
2221 : */
2222 : void
2223 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
2224 : {
2225 : HeapTuple tup;
2226 : Relation rel;
2227 :
2228 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2229 :
2230 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
2231 :
2232 0 : if (!HeapTupleIsValid(tup))
2233 0 : ereport(ERROR,
2234 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2235 : errmsg("subscription with OID %u does not exist", subid)));
2236 :
2237 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2238 :
2239 0 : heap_freetuple(tup);
2240 :
2241 0 : table_close(rel, RowExclusiveLock);
2242 0 : }
2243 :
2244 : /*
2245 : * Check and log a warning if the publisher has subscribed to the same table,
2246 : * its partition ancestors (if it's a partition), or its partition children (if
2247 : * it's a partitioned table), from some other publishers. This check is
2248 : * required in the following scenarios:
2249 : *
2250 : * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
2251 : * with "copy_data = true" and "origin = none":
2252 : * - Warn the user that data with an origin might have been copied.
2253 : * - This check is skipped for tables already added, as incremental sync via
2254 : * WAL allows origin tracking. The list of such tables is in
2255 : * subrel_local_oids.
2256 : *
2257 : * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
2258 : * with "retain_dead_tuples = true" and "origin = any", and for ALTER
2259 : * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or
2260 : * when the publisher's status changes (e.g., due to a connection string
2261 : * update):
2262 : * - Warn the user that only conflict detection info for local changes on
2263 : * the publisher is retained. Data from other origins may lack sufficient
2264 : * details for reliable conflict detection.
2265 : * - See comments atop worker.c for more details.
2266 : */
2267 : static void
2268 320 : check_publications_origin(WalReceiverConn *wrconn, List *publications,
2269 : bool copydata, bool retain_dead_tuples,
2270 : char *origin, Oid *subrel_local_oids,
2271 : int subrel_count, char *subname)
2272 : {
2273 : WalRcvExecResult *res;
2274 : StringInfoData cmd;
2275 : TupleTableSlot *slot;
2276 320 : Oid tableRow[1] = {TEXTOID};
2277 320 : List *publist = NIL;
2278 : int i;
2279 : bool check_rdt;
2280 : bool check_table_sync;
2281 640 : bool origin_none = origin &&
2282 320 : pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
2283 :
2284 : /*
2285 : * Enable retain_dead_tuples checks only when origin is set to 'any',
2286 : * since with origin='none' only local changes are replicated to the
2287 : * subscriber.
2288 : */
2289 320 : check_rdt = retain_dead_tuples && !origin_none;
2290 :
2291 : /*
2292 : * Enable table synchronization checks only when origin is 'none', to
2293 : * ensure that data from other origins is not inadvertently copied.
2294 : */
2295 320 : check_table_sync = copydata && origin_none;
2296 :
2297 : /* retain_dead_tuples and table sync checks occur separately */
2298 : Assert(!(check_rdt && check_table_sync));
2299 :
2300 : /* Return if no checks are required */
2301 320 : if (!check_rdt && !check_table_sync)
2302 294 : return;
2303 :
2304 26 : initStringInfo(&cmd);
2305 26 : appendStringInfoString(&cmd,
2306 : "SELECT DISTINCT P.pubname AS pubname\n"
2307 : "FROM pg_publication P,\n"
2308 : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2309 : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2310 : " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2311 : " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2312 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2313 : "WHERE C.oid = GPT.relid AND P.pubname IN (");
2314 26 : GetPublicationsStr(publications, &cmd, true);
2315 26 : appendStringInfoString(&cmd, ")\n");
2316 :
2317 : /*
2318 : * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
2319 : * the list of relation oids that are already present on the subscriber.
2320 : * This check should be skipped for these tables if checking for table
2321 : * sync scenario. However, when handling the retain_dead_tuples scenario,
2322 : * ensure all tables are checked, as some existing tables may now include
2323 : * changes from other origins due to newly created subscriptions on the
2324 : * publisher.
2325 : */
2326 26 : if (check_table_sync)
2327 : {
2328 24 : for (i = 0; i < subrel_count; i++)
2329 : {
2330 6 : Oid relid = subrel_local_oids[i];
2331 6 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
2332 6 : char *tablename = get_rel_name(relid);
2333 :
2334 6 : appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2335 : schemaname, tablename);
2336 : }
2337 : }
2338 :
2339 26 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2340 26 : pfree(cmd.data);
2341 :
2342 26 : if (res->status != WALRCV_OK_TUPLES)
2343 0 : ereport(ERROR,
2344 : (errcode(ERRCODE_CONNECTION_FAILURE),
2345 : errmsg("could not receive list of replicated tables from the publisher: %s",
2346 : res->err)));
2347 :
2348 : /* Process tables. */
2349 26 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2350 36 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2351 : {
2352 : char *pubname;
2353 : bool isnull;
2354 :
2355 10 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2356 : Assert(!isnull);
2357 :
2358 10 : ExecClearTuple(slot);
2359 10 : publist = list_append_unique(publist, makeString(pubname));
2360 : }
2361 :
2362 : /*
2363 : * Log a warning if the publisher has subscribed to the same table from
2364 : * some other publisher. We cannot know the origin of data during the
2365 : * initial sync. Data origins can be found only from the WAL by looking at
2366 : * the origin id.
2367 : *
2368 : * XXX: For simplicity, we don't check whether the table has any data or
2369 : * not. If the table doesn't have any data then we don't need to
2370 : * distinguish between data having origin and data not having origin so we
2371 : * can avoid logging a warning for table sync scenario.
2372 : */
2373 26 : if (publist)
2374 : {
2375 10 : StringInfo pubnames = makeStringInfo();
2376 10 : StringInfo err_msg = makeStringInfo();
2377 10 : StringInfo err_hint = makeStringInfo();
2378 :
2379 : /* Prepare the list of publication(s) for warning message. */
2380 10 : GetPublicationsStr(publist, pubnames, false);
2381 :
2382 10 : if (check_table_sync)
2383 : {
2384 8 : appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
2385 : subname);
2386 8 : appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
2387 : }
2388 : else
2389 : {
2390 2 : appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
2391 : subname);
2392 2 : appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
2393 : }
2394 :
2395 10 : ereport(WARNING,
2396 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2397 : errmsg_internal("%s", err_msg->data),
2398 : errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2399 : "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2400 : list_length(publist), pubnames->data),
2401 : errhint_internal("%s", err_hint->data));
2402 : }
2403 :
2404 26 : ExecDropSingleTupleTableSlot(slot);
2405 :
2406 26 : walrcv_clear_result(res);
2407 : }
2408 :
2409 : /*
2410 : * Determine whether the retain_dead_tuples can be enabled based on the
2411 : * publisher's status.
2412 : *
2413 : * This option is disallowed if the publisher is running a version earlier
2414 : * than the PG19, or if the publisher is in recovery (i.e., it is a standby
2415 : * server).
2416 : *
2417 : * See comments atop worker.c for a detailed explanation.
2418 : */
2419 : static void
2420 18 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
2421 : {
2422 : WalRcvExecResult *res;
2423 18 : Oid RecoveryRow[1] = {BOOLOID};
2424 : TupleTableSlot *slot;
2425 : bool isnull;
2426 : bool remote_in_recovery;
2427 :
2428 18 : if (walrcv_server_version(wrconn) < 19000)
2429 0 : ereport(ERROR,
2430 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2431 : errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
2432 :
2433 18 : res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
2434 :
2435 18 : if (res->status != WALRCV_OK_TUPLES)
2436 0 : ereport(ERROR,
2437 : (errcode(ERRCODE_CONNECTION_FAILURE),
2438 : errmsg("could not obtain recovery progress from the publisher: %s",
2439 : res->err)));
2440 :
2441 18 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2442 18 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2443 0 : elog(ERROR, "failed to fetch tuple for the recovery progress");
2444 :
2445 18 : remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
2446 :
2447 18 : if (remote_in_recovery)
2448 0 : ereport(ERROR,
2449 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2450 : errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
2451 :
2452 18 : ExecDropSingleTupleTableSlot(slot);
2453 :
2454 18 : walrcv_clear_result(res);
2455 18 : }
2456 :
2457 : /*
2458 : * Check if the subscriber's configuration is adequate to enable the
2459 : * retain_dead_tuples option.
2460 : *
2461 : * Issue an ERROR if the wal_level does not support the use of replication
2462 : * slots when check_guc is set to true.
2463 : *
2464 : * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
2465 : * set to true. This is only to highlight the importance of enabling
2466 : * track_commit_timestamp instead of catching all the misconfigurations, as
2467 : * this setting can be adjusted after subscription creation. Without it, the
2468 : * apply worker will simply skip conflict detection.
2469 : *
2470 : * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an
2471 : * ERROR since users can only modify retain_dead_tuples for disabled
2472 : * subscriptions. And as long as the subscription is enabled promptly, it will
2473 : * not pose issues.
2474 : */
2475 : void
2476 22 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
2477 : int elevel_for_sub_disabled)
2478 : {
2479 : Assert(elevel_for_sub_disabled == NOTICE ||
2480 : elevel_for_sub_disabled == WARNING);
2481 :
2482 22 : if (check_guc && wal_level < WAL_LEVEL_REPLICA)
2483 0 : ereport(ERROR,
2484 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2485 : errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
2486 : errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
2487 :
2488 22 : if (check_guc && !track_commit_timestamp)
2489 8 : ereport(WARNING,
2490 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2491 : errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
2492 : errhint("Consider setting \"%s\" to true.",
2493 : "track_commit_timestamp"));
2494 :
2495 22 : if (sub_disabled)
2496 10 : ereport(elevel_for_sub_disabled,
2497 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2498 : errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
2499 : (elevel_for_sub_disabled > NOTICE)
2500 : ? errhint("Consider setting %s to false.",
2501 : "retain_dead_tuples") : 0);
2502 22 : }
2503 :
2504 : /*
2505 : * Get the list of tables which belong to specified publications on the
2506 : * publisher connection.
2507 : *
2508 : * Note that we don't support the case where the column list is different for
2509 : * the same table in different publications to avoid sending unwanted column
2510 : * information for some of the rows. This can happen when both the column
2511 : * list and row filter are specified for different publications.
2512 : */
2513 : static List *
2514 300 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
2515 : {
2516 : WalRcvExecResult *res;
2517 : StringInfoData cmd;
2518 : TupleTableSlot *slot;
2519 300 : Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
2520 300 : List *tablelist = NIL;
2521 300 : int server_version = walrcv_server_version(wrconn);
2522 300 : bool check_columnlist = (server_version >= 150000);
2523 300 : StringInfo pub_names = makeStringInfo();
2524 :
2525 300 : initStringInfo(&cmd);
2526 :
2527 : /* Build the pub_names comma-separated string. */
2528 300 : GetPublicationsStr(publications, pub_names, true);
2529 :
2530 : /* Get the list of tables from the publisher. */
2531 300 : if (server_version >= 160000)
2532 : {
2533 300 : tableRow[2] = INT2VECTOROID;
2534 :
2535 : /*
2536 : * From version 16, we allowed passing multiple publications to the
2537 : * function pg_get_publication_tables. This helped to filter out the
2538 : * partition table whose ancestor is also published in this
2539 : * publication array.
2540 : *
2541 : * Join pg_get_publication_tables with pg_publication to exclude
2542 : * non-existing publications.
2543 : *
2544 : * Note that attrs are always stored in sorted order so we don't need
2545 : * to worry if different publications have specified them in a
2546 : * different order. See pub_collist_validate.
2547 : */
2548 300 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
2549 : " FROM pg_class c\n"
2550 : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2551 : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2552 : " FROM pg_publication\n"
2553 : " WHERE pubname IN ( %s )) AS gpt\n"
2554 : " ON gpt.relid = c.oid\n",
2555 : pub_names->data);
2556 : }
2557 : else
2558 : {
2559 0 : tableRow[2] = NAMEARRAYOID;
2560 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
2561 :
2562 : /* Get column lists for each relation if the publisher supports it */
2563 0 : if (check_columnlist)
2564 0 : appendStringInfoString(&cmd, ", t.attnames\n");
2565 :
2566 0 : appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2567 : " WHERE t.pubname IN ( %s )",
2568 : pub_names->data);
2569 : }
2570 :
2571 300 : destroyStringInfo(pub_names);
2572 :
2573 300 : res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
2574 300 : pfree(cmd.data);
2575 :
2576 300 : if (res->status != WALRCV_OK_TUPLES)
2577 0 : ereport(ERROR,
2578 : (errcode(ERRCODE_CONNECTION_FAILURE),
2579 : errmsg("could not receive list of replicated tables from the publisher: %s",
2580 : res->err)));
2581 :
2582 : /* Process tables. */
2583 300 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2584 838 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2585 : {
2586 : char *nspname;
2587 : char *relname;
2588 : bool isnull;
2589 : RangeVar *rv;
2590 :
2591 540 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2592 : Assert(!isnull);
2593 540 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2594 : Assert(!isnull);
2595 :
2596 540 : rv = makeRangeVar(nspname, relname, -1);
2597 :
2598 540 : if (check_columnlist && list_member(tablelist, rv))
2599 2 : ereport(ERROR,
2600 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2601 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2602 : nspname, relname));
2603 : else
2604 538 : tablelist = lappend(tablelist, rv);
2605 :
2606 538 : ExecClearTuple(slot);
2607 : }
2608 298 : ExecDropSingleTupleTableSlot(slot);
2609 :
2610 298 : walrcv_clear_result(res);
2611 :
2612 298 : return tablelist;
2613 : }
2614 :
2615 : /*
2616 : * This is to report the connection failure while dropping replication slots.
2617 : * Here, we report the WARNING for all tablesync slots so that user can drop
2618 : * them manually, if required.
2619 : */
2620 : static void
2621 0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
2622 : {
2623 : ListCell *lc;
2624 :
2625 0 : foreach(lc, rstates)
2626 : {
2627 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2628 0 : Oid relid = rstate->relid;
2629 :
2630 : /* Only cleanup resources of tablesync workers */
2631 0 : if (!OidIsValid(relid))
2632 0 : continue;
2633 :
2634 : /*
2635 : * Caller needs to ensure that relstate doesn't change underneath us.
2636 : * See DropSubscription where we get the relstates.
2637 : */
2638 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2639 : {
2640 0 : char syncslotname[NAMEDATALEN] = {0};
2641 :
2642 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2643 : sizeof(syncslotname));
2644 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
2645 : syncslotname);
2646 : }
2647 : }
2648 :
2649 0 : ereport(ERROR,
2650 : (errcode(ERRCODE_CONNECTION_FAILURE),
2651 : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2652 : slotname, err),
2653 : /* translator: %s is an SQL ALTER command */
2654 : errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
2655 : "ALTER SUBSCRIPTION ... DISABLE",
2656 : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2657 : }
2658 :
2659 : /*
2660 : * Check for duplicates in the given list of publications and error out if
2661 : * found one. Add publications to datums as text datums, if datums is not
2662 : * NULL.
2663 : */
2664 : static void
2665 440 : check_duplicates_in_publist(List *publist, Datum *datums)
2666 : {
2667 : ListCell *cell;
2668 440 : int j = 0;
2669 :
2670 1004 : foreach(cell, publist)
2671 : {
2672 582 : char *name = strVal(lfirst(cell));
2673 : ListCell *pcell;
2674 :
2675 880 : foreach(pcell, publist)
2676 : {
2677 880 : char *pname = strVal(lfirst(pcell));
2678 :
2679 880 : if (pcell == cell)
2680 564 : break;
2681 :
2682 316 : if (strcmp(name, pname) == 0)
2683 18 : ereport(ERROR,
2684 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2685 : errmsg("publication name \"%s\" used more than once",
2686 : pname)));
2687 : }
2688 :
2689 564 : if (datums)
2690 478 : datums[j++] = CStringGetTextDatum(name);
2691 : }
2692 422 : }
2693 :
2694 : /*
2695 : * Merge current subscription's publications and user-specified publications
2696 : * from ADD/DROP PUBLICATIONS.
2697 : *
2698 : * If addpub is true, we will add the list of publications into oldpublist.
2699 : * Otherwise, we will delete the list of publications from oldpublist. The
2700 : * returned list is a copy, oldpublist itself is not changed.
2701 : *
2702 : * subname is the subscription name, for error messages.
2703 : */
2704 : static List *
2705 54 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
2706 : {
2707 : ListCell *lc;
2708 :
2709 54 : oldpublist = list_copy(oldpublist);
2710 :
2711 54 : check_duplicates_in_publist(newpublist, NULL);
2712 :
2713 92 : foreach(lc, newpublist)
2714 : {
2715 68 : char *name = strVal(lfirst(lc));
2716 : ListCell *lc2;
2717 68 : bool found = false;
2718 :
2719 134 : foreach(lc2, oldpublist)
2720 : {
2721 110 : char *pubname = strVal(lfirst(lc2));
2722 :
2723 110 : if (strcmp(name, pubname) == 0)
2724 : {
2725 44 : found = true;
2726 44 : if (addpub)
2727 12 : ereport(ERROR,
2728 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2729 : errmsg("publication \"%s\" is already in subscription \"%s\"",
2730 : name, subname)));
2731 : else
2732 32 : oldpublist = foreach_delete_current(oldpublist, lc2);
2733 :
2734 32 : break;
2735 : }
2736 : }
2737 :
2738 56 : if (addpub && !found)
2739 18 : oldpublist = lappend(oldpublist, makeString(name));
2740 38 : else if (!addpub && !found)
2741 6 : ereport(ERROR,
2742 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2743 : errmsg("publication \"%s\" is not in subscription \"%s\"",
2744 : name, subname)));
2745 : }
2746 :
2747 : /*
2748 : * XXX Probably no strong reason for this, but for now it's to make ALTER
2749 : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
2750 : */
2751 24 : if (!oldpublist)
2752 6 : ereport(ERROR,
2753 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2754 : errmsg("cannot drop all the publications from a subscription")));
2755 :
2756 18 : return oldpublist;
2757 : }
2758 :
2759 : /*
2760 : * Extract the streaming mode value from a DefElem. This is like
2761 : * defGetBoolean() but also accepts the special value of "parallel".
2762 : */
2763 : char
2764 814 : defGetStreamingMode(DefElem *def)
2765 : {
2766 : /*
2767 : * If no parameter value given, assume "true" is meant.
2768 : */
2769 814 : if (!def->arg)
2770 0 : return LOGICALREP_STREAM_ON;
2771 :
2772 : /*
2773 : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
2774 : */
2775 814 : switch (nodeTag(def->arg))
2776 : {
2777 0 : case T_Integer:
2778 0 : switch (intVal(def->arg))
2779 : {
2780 0 : case 0:
2781 0 : return LOGICALREP_STREAM_OFF;
2782 0 : case 1:
2783 0 : return LOGICALREP_STREAM_ON;
2784 0 : default:
2785 : /* otherwise, error out below */
2786 0 : break;
2787 : }
2788 0 : break;
2789 814 : default:
2790 : {
2791 814 : char *sval = defGetString(def);
2792 :
2793 : /*
2794 : * The set of strings accepted here should match up with the
2795 : * grammar's opt_boolean_or_string production.
2796 : */
2797 1622 : if (pg_strcasecmp(sval, "false") == 0 ||
2798 808 : pg_strcasecmp(sval, "off") == 0)
2799 12 : return LOGICALREP_STREAM_OFF;
2800 1586 : if (pg_strcasecmp(sval, "true") == 0 ||
2801 784 : pg_strcasecmp(sval, "on") == 0)
2802 92 : return LOGICALREP_STREAM_ON;
2803 710 : if (pg_strcasecmp(sval, "parallel") == 0)
2804 704 : return LOGICALREP_STREAM_PARALLEL;
2805 : }
2806 6 : break;
2807 : }
2808 :
2809 6 : ereport(ERROR,
2810 : (errcode(ERRCODE_SYNTAX_ERROR),
2811 : errmsg("%s requires a Boolean value or \"parallel\"",
2812 : def->defname)));
2813 : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
2814 : }
|