Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * subscriptioncmds.c
4 : * subscription catalog manipulation functions
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/subscriptioncmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/htup_details.h"
19 : #include "access/table.h"
20 : #include "access/twophase.h"
21 : #include "access/xact.h"
22 : #include "catalog/catalog.h"
23 : #include "catalog/dependency.h"
24 : #include "catalog/indexing.h"
25 : #include "catalog/namespace.h"
26 : #include "catalog/objectaccess.h"
27 : #include "catalog/objectaddress.h"
28 : #include "catalog/pg_authid_d.h"
29 : #include "catalog/pg_database_d.h"
30 : #include "catalog/pg_foreign_server.h"
31 : #include "catalog/pg_subscription.h"
32 : #include "catalog/pg_subscription_rel.h"
33 : #include "catalog/pg_type.h"
34 : #include "catalog/pg_user_mapping.h"
35 : #include "commands/defrem.h"
36 : #include "commands/event_trigger.h"
37 : #include "commands/subscriptioncmds.h"
38 : #include "executor/executor.h"
39 : #include "foreign/foreign.h"
40 : #include "miscadmin.h"
41 : #include "nodes/makefuncs.h"
42 : #include "pgstat.h"
43 : #include "replication/logicallauncher.h"
44 : #include "replication/logicalworker.h"
45 : #include "replication/origin.h"
46 : #include "replication/slot.h"
47 : #include "replication/walreceiver.h"
48 : #include "replication/walsender.h"
49 : #include "replication/worker_internal.h"
50 : #include "storage/lmgr.h"
51 : #include "utils/acl.h"
52 : #include "utils/builtins.h"
53 : #include "utils/guc.h"
54 : #include "utils/lsyscache.h"
55 : #include "utils/memutils.h"
56 : #include "utils/pg_lsn.h"
57 : #include "utils/syscache.h"
58 :
59 : /*
60 : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
61 : * command.
62 : */
63 : #define SUBOPT_CONNECT 0x00000001
64 : #define SUBOPT_ENABLED 0x00000002
65 : #define SUBOPT_CREATE_SLOT 0x00000004
66 : #define SUBOPT_SLOT_NAME 0x00000008
67 : #define SUBOPT_COPY_DATA 0x00000010
68 : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
69 : #define SUBOPT_REFRESH 0x00000040
70 : #define SUBOPT_BINARY 0x00000080
71 : #define SUBOPT_STREAMING 0x00000100
72 : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
73 : #define SUBOPT_DISABLE_ON_ERR 0x00000400
74 : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
75 : #define SUBOPT_RUN_AS_OWNER 0x00001000
76 : #define SUBOPT_FAILOVER 0x00002000
77 : #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
78 : #define SUBOPT_MAX_RETENTION_DURATION 0x00008000
79 : #define SUBOPT_WAL_RECEIVER_TIMEOUT 0x00010000
80 : #define SUBOPT_LSN 0x00020000
81 : #define SUBOPT_ORIGIN 0x00040000
82 :
83 : /* check if the 'val' has 'bits' set */
84 : #define IsSet(val, bits) (((val) & (bits)) == (bits))
85 :
86 : /*
87 : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
88 : * SUBSCRIPTION command options and the parsed/default values of each of them.
89 : */
90 : typedef struct SubOpts
91 : {
92 : uint32 specified_opts;
93 : char *slot_name;
94 : char *synchronous_commit;
95 : bool connect;
96 : bool enabled;
97 : bool create_slot;
98 : bool copy_data;
99 : bool refresh;
100 : bool binary;
101 : char streaming;
102 : bool twophase;
103 : bool disableonerr;
104 : bool passwordrequired;
105 : bool runasowner;
106 : bool failover;
107 : bool retaindeadtuples;
108 : int32 maxretention;
109 : char *origin;
110 : XLogRecPtr lsn;
111 : char *wal_receiver_timeout;
112 : } SubOpts;
113 :
114 : /*
115 : * PublicationRelKind represents a relation included in a publication.
116 : * It stores the schema-qualified relation name (rv) and its kind (relkind).
117 : */
118 : typedef struct PublicationRelKind
119 : {
120 : RangeVar *rv;
121 : char relkind;
122 : } PublicationRelKind;
123 :
124 : static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
125 : static void check_publications_origin_tables(WalReceiverConn *wrconn,
126 : List *publications, bool copydata,
127 : bool retain_dead_tuples,
128 : char *origin,
129 : Oid *subrel_local_oids,
130 : int subrel_count, char *subname);
131 : static void check_publications_origin_sequences(WalReceiverConn *wrconn,
132 : List *publications,
133 : bool copydata, char *origin,
134 : Oid *subrel_local_oids,
135 : int subrel_count,
136 : char *subname);
137 : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
138 : static void check_duplicates_in_publist(List *publist, Datum *datums);
139 : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
140 : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
141 : static void CheckAlterSubOption(Subscription *sub, const char *option,
142 : bool slot_needs_update, bool isTopLevel);
143 :
144 :
145 : /*
146 : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
147 : *
148 : * Since not all options can be specified in both commands, this function
149 : * will report an error if mutually exclusive options are specified.
150 : */
151 : static void
152 624 : parse_subscription_options(ParseState *pstate, List *stmt_options,
153 : uint32 supported_opts, SubOpts *opts)
154 : {
155 : ListCell *lc;
156 :
157 : /* Start out with cleared opts. */
158 624 : memset(opts, 0, sizeof(SubOpts));
159 :
160 : /* caller must expect some option */
161 : Assert(supported_opts != 0);
162 :
163 : /* If connect option is supported, these others also need to be. */
164 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
165 : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
166 : SUBOPT_COPY_DATA));
167 :
168 : /* Set default values for the supported options. */
169 624 : if (IsSet(supported_opts, SUBOPT_CONNECT))
170 308 : opts->connect = true;
171 624 : if (IsSet(supported_opts, SUBOPT_ENABLED))
172 366 : opts->enabled = true;
173 624 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
174 308 : opts->create_slot = true;
175 624 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
176 395 : opts->copy_data = true;
177 624 : if (IsSet(supported_opts, SUBOPT_REFRESH))
178 54 : opts->refresh = true;
179 624 : if (IsSet(supported_opts, SUBOPT_BINARY))
180 464 : opts->binary = false;
181 624 : if (IsSet(supported_opts, SUBOPT_STREAMING))
182 464 : opts->streaming = LOGICALREP_STREAM_PARALLEL;
183 624 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
184 464 : opts->twophase = false;
185 624 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
186 464 : opts->disableonerr = false;
187 624 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
188 464 : opts->passwordrequired = true;
189 624 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
190 464 : opts->runasowner = false;
191 624 : if (IsSet(supported_opts, SUBOPT_FAILOVER))
192 464 : opts->failover = false;
193 624 : if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
194 464 : opts->retaindeadtuples = false;
195 624 : if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
196 464 : opts->maxretention = 0;
197 624 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
198 464 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
199 :
200 : /* Parse options */
201 1257 : foreach(lc, stmt_options)
202 : {
203 689 : DefElem *defel = (DefElem *) lfirst(lc);
204 :
205 689 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
206 409 : strcmp(defel->defname, "connect") == 0)
207 : {
208 145 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
209 0 : errorConflictingDefElem(defel, pstate);
210 :
211 145 : opts->specified_opts |= SUBOPT_CONNECT;
212 145 : opts->connect = defGetBoolean(defel);
213 : }
214 544 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
215 322 : strcmp(defel->defname, "enabled") == 0)
216 : {
217 82 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
218 0 : errorConflictingDefElem(defel, pstate);
219 :
220 82 : opts->specified_opts |= SUBOPT_ENABLED;
221 82 : opts->enabled = defGetBoolean(defel);
222 : }
223 462 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
224 240 : strcmp(defel->defname, "create_slot") == 0)
225 : {
226 25 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
227 0 : errorConflictingDefElem(defel, pstate);
228 :
229 25 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
230 25 : opts->create_slot = defGetBoolean(defel);
231 : }
232 437 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
233 373 : strcmp(defel->defname, "slot_name") == 0)
234 : {
235 121 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
236 0 : errorConflictingDefElem(defel, pstate);
237 :
238 121 : opts->specified_opts |= SUBOPT_SLOT_NAME;
239 121 : opts->slot_name = defGetString(defel);
240 :
241 : /* Setting slot_name = NONE is treated as no slot name. */
242 238 : if (strcmp(opts->slot_name, "none") == 0)
243 97 : opts->slot_name = NULL;
244 : else
245 24 : ReplicationSlotValidateName(opts->slot_name, false, ERROR);
246 : }
247 316 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
248 198 : strcmp(defel->defname, "copy_data") == 0)
249 : {
250 31 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
251 0 : errorConflictingDefElem(defel, pstate);
252 :
253 31 : opts->specified_opts |= SUBOPT_COPY_DATA;
254 31 : opts->copy_data = defGetBoolean(defel);
255 : }
256 285 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
257 226 : strcmp(defel->defname, "synchronous_commit") == 0)
258 : {
259 8 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
260 0 : errorConflictingDefElem(defel, pstate);
261 :
262 8 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
263 8 : opts->synchronous_commit = defGetString(defel);
264 :
265 : /* Test if the given value is valid for synchronous_commit GUC. */
266 8 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
267 : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
268 : false, 0, false);
269 : }
270 277 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
271 44 : strcmp(defel->defname, "refresh") == 0)
272 : {
273 44 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
274 0 : errorConflictingDefElem(defel, pstate);
275 :
276 44 : opts->specified_opts |= SUBOPT_REFRESH;
277 44 : opts->refresh = defGetBoolean(defel);
278 : }
279 233 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
280 218 : strcmp(defel->defname, "binary") == 0)
281 : {
282 19 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
283 0 : errorConflictingDefElem(defel, pstate);
284 :
285 19 : opts->specified_opts |= SUBOPT_BINARY;
286 19 : opts->binary = defGetBoolean(defel);
287 : }
288 214 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
289 199 : strcmp(defel->defname, "streaming") == 0)
290 : {
291 43 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
292 0 : errorConflictingDefElem(defel, pstate);
293 :
294 43 : opts->specified_opts |= SUBOPT_STREAMING;
295 43 : opts->streaming = defGetStreamingMode(defel);
296 : }
297 171 : else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
298 156 : strcmp(defel->defname, "two_phase") == 0)
299 : {
300 24 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
301 0 : errorConflictingDefElem(defel, pstate);
302 :
303 24 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
304 24 : opts->twophase = defGetBoolean(defel);
305 : }
306 147 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
307 132 : strcmp(defel->defname, "disable_on_error") == 0)
308 : {
309 13 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
310 0 : errorConflictingDefElem(defel, pstate);
311 :
312 13 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
313 13 : opts->disableonerr = defGetBoolean(defel);
314 : }
315 134 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
316 119 : strcmp(defel->defname, "password_required") == 0)
317 : {
318 16 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
319 0 : errorConflictingDefElem(defel, pstate);
320 :
321 16 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
322 16 : opts->passwordrequired = defGetBoolean(defel);
323 : }
324 118 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
325 103 : strcmp(defel->defname, "run_as_owner") == 0)
326 : {
327 11 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
328 0 : errorConflictingDefElem(defel, pstate);
329 :
330 11 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
331 11 : opts->runasowner = defGetBoolean(defel);
332 : }
333 107 : else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
334 92 : strcmp(defel->defname, "failover") == 0)
335 : {
336 16 : if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
337 0 : errorConflictingDefElem(defel, pstate);
338 :
339 16 : opts->specified_opts |= SUBOPT_FAILOVER;
340 16 : opts->failover = defGetBoolean(defel);
341 : }
342 91 : else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
343 76 : strcmp(defel->defname, "retain_dead_tuples") == 0)
344 : {
345 14 : if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
346 0 : errorConflictingDefElem(defel, pstate);
347 :
348 14 : opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
349 14 : opts->retaindeadtuples = defGetBoolean(defel);
350 : }
351 77 : else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
352 62 : strcmp(defel->defname, "max_retention_duration") == 0)
353 : {
354 22 : if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
355 0 : errorConflictingDefElem(defel, pstate);
356 :
357 22 : opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
358 22 : opts->maxretention = defGetInt32(defel);
359 :
360 18 : if (opts->maxretention < 0)
361 8 : ereport(ERROR,
362 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
363 : errmsg("max_retention_duration cannot be negative"));
364 : }
365 55 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
366 40 : strcmp(defel->defname, "origin") == 0)
367 : {
368 24 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
369 0 : errorConflictingDefElem(defel, pstate);
370 :
371 24 : opts->specified_opts |= SUBOPT_ORIGIN;
372 24 : pfree(opts->origin);
373 :
374 : /*
375 : * Even though the "origin" parameter allows only "none" and "any"
376 : * values, it is implemented as a string type so that the
377 : * parameter can be extended in future versions to support
378 : * filtering using origin names specified by the user.
379 : */
380 24 : opts->origin = defGetString(defel);
381 :
382 34 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
383 10 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
384 4 : ereport(ERROR,
385 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
386 : errmsg("unrecognized origin value: \"%s\"", opts->origin));
387 : }
388 31 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
389 15 : strcmp(defel->defname, "lsn") == 0)
390 11 : {
391 15 : char *lsn_str = defGetString(defel);
392 : XLogRecPtr lsn;
393 :
394 15 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
395 0 : errorConflictingDefElem(defel, pstate);
396 :
397 : /* Setting lsn = NONE is treated as resetting LSN */
398 15 : if (strcmp(lsn_str, "none") == 0)
399 4 : lsn = InvalidXLogRecPtr;
400 : else
401 : {
402 : /* Parse the argument as LSN */
403 11 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
404 : CStringGetDatum(lsn_str)));
405 :
406 11 : if (!XLogRecPtrIsValid(lsn))
407 4 : ereport(ERROR,
408 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
409 : errmsg("invalid WAL location (LSN): %s", lsn_str)));
410 : }
411 :
412 11 : opts->specified_opts |= SUBOPT_LSN;
413 11 : opts->lsn = lsn;
414 : }
415 16 : else if (IsSet(supported_opts, SUBOPT_WAL_RECEIVER_TIMEOUT) &&
416 16 : strcmp(defel->defname, "wal_receiver_timeout") == 0)
417 8 : {
418 : bool parsed;
419 : int val;
420 :
421 12 : if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
422 0 : errorConflictingDefElem(defel, pstate);
423 :
424 12 : opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
425 12 : opts->wal_receiver_timeout = defGetString(defel);
426 :
427 : /*
428 : * Test if the given value is valid for wal_receiver_timeout GUC.
429 : * Skip this test if the value is -1, since -1 is allowed for the
430 : * wal_receiver_timeout subscription option, but not for the GUC
431 : * itself.
432 : */
433 12 : parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
434 12 : if (!parsed || val != -1)
435 8 : (void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
436 : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
437 : false, 0, false);
438 : }
439 : else
440 4 : ereport(ERROR,
441 : (errcode(ERRCODE_SYNTAX_ERROR),
442 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
443 : }
444 :
445 : /*
446 : * We've been explicitly asked to not connect, that requires some
447 : * additional processing.
448 : */
449 568 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
450 : {
451 : /* Check for incompatible options from the user. */
452 113 : if (opts->enabled &&
453 113 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
454 4 : ereport(ERROR,
455 : (errcode(ERRCODE_SYNTAX_ERROR),
456 : /*- translator: both %s are strings of the form "option = value" */
457 : errmsg("%s and %s are mutually exclusive options",
458 : "connect = false", "enabled = true")));
459 :
460 109 : if (opts->create_slot &&
461 105 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
462 4 : ereport(ERROR,
463 : (errcode(ERRCODE_SYNTAX_ERROR),
464 : errmsg("%s and %s are mutually exclusive options",
465 : "connect = false", "create_slot = true")));
466 :
467 105 : if (opts->copy_data &&
468 101 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
469 4 : ereport(ERROR,
470 : (errcode(ERRCODE_SYNTAX_ERROR),
471 : errmsg("%s and %s are mutually exclusive options",
472 : "connect = false", "copy_data = true")));
473 :
474 : /* Change the defaults of other options. */
475 101 : opts->enabled = false;
476 101 : opts->create_slot = false;
477 101 : opts->copy_data = false;
478 : }
479 :
480 : /*
481 : * Do additional checking for disallowed combination when slot_name = NONE
482 : * was used.
483 : */
484 556 : if (!opts->slot_name &&
485 536 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
486 : {
487 93 : if (opts->enabled)
488 : {
489 12 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
490 4 : ereport(ERROR,
491 : (errcode(ERRCODE_SYNTAX_ERROR),
492 : /*- translator: both %s are strings of the form "option = value" */
493 : errmsg("%s and %s are mutually exclusive options",
494 : "slot_name = NONE", "enabled = true")));
495 : else
496 8 : ereport(ERROR,
497 : (errcode(ERRCODE_SYNTAX_ERROR),
498 : /*- translator: both %s are strings of the form "option = value" */
499 : errmsg("subscription with %s must also set %s",
500 : "slot_name = NONE", "enabled = false")));
501 : }
502 :
503 81 : if (opts->create_slot)
504 : {
505 8 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
506 4 : ereport(ERROR,
507 : (errcode(ERRCODE_SYNTAX_ERROR),
508 : /*- translator: both %s are strings of the form "option = value" */
509 : errmsg("%s and %s are mutually exclusive options",
510 : "slot_name = NONE", "create_slot = true")));
511 : else
512 4 : ereport(ERROR,
513 : (errcode(ERRCODE_SYNTAX_ERROR),
514 : /*- translator: both %s are strings of the form "option = value" */
515 : errmsg("subscription with %s must also set %s",
516 : "slot_name = NONE", "create_slot = false")));
517 : }
518 : }
519 536 : }
520 :
521 : /*
522 : * Check that the specified publications are present on the publisher.
523 : */
524 : static void
525 135 : check_publications(WalReceiverConn *wrconn, List *publications)
526 : {
527 : WalRcvExecResult *res;
528 : StringInfoData cmd;
529 : TupleTableSlot *slot;
530 135 : List *publicationsCopy = NIL;
531 135 : Oid tableRow[1] = {TEXTOID};
532 :
533 135 : initStringInfo(&cmd);
534 135 : appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
535 : " pg_catalog.pg_publication t WHERE\n"
536 : " t.pubname IN (");
537 135 : GetPublicationsStr(publications, &cmd, true);
538 135 : appendStringInfoChar(&cmd, ')');
539 :
540 135 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
541 135 : pfree(cmd.data);
542 :
543 135 : if (res->status != WALRCV_OK_TUPLES)
544 0 : ereport(ERROR,
545 : errmsg("could not receive list of publications from the publisher: %s",
546 : res->err));
547 :
548 135 : publicationsCopy = list_copy(publications);
549 :
550 : /* Process publication(s). */
551 135 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
552 299 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
553 : {
554 : char *pubname;
555 : bool isnull;
556 :
557 164 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
558 : Assert(!isnull);
559 :
560 : /* Delete the publication present in publisher from the list. */
561 164 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
562 164 : ExecClearTuple(slot);
563 : }
564 :
565 135 : ExecDropSingleTupleTableSlot(slot);
566 :
567 135 : walrcv_clear_result(res);
568 :
569 135 : if (list_length(publicationsCopy))
570 : {
571 : /* Prepare the list of non-existent publication(s) for error message. */
572 : StringInfoData pubnames;
573 :
574 4 : initStringInfo(&pubnames);
575 :
576 4 : GetPublicationsStr(publicationsCopy, &pubnames, false);
577 4 : ereport(WARNING,
578 : errcode(ERRCODE_UNDEFINED_OBJECT),
579 : errmsg_plural("publication %s does not exist on the publisher",
580 : "publications %s do not exist on the publisher",
581 : list_length(publicationsCopy),
582 : pubnames.data));
583 : }
584 135 : }
585 :
586 : /*
587 : * Auxiliary function to build a text array out of a list of String nodes.
588 : */
589 : static Datum
590 229 : publicationListToArray(List *publist)
591 : {
592 : ArrayType *arr;
593 : Datum *datums;
594 : MemoryContext memcxt;
595 : MemoryContext oldcxt;
596 :
597 : /* Create memory context for temporary allocations. */
598 229 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
599 : "publicationListToArray to array",
600 : ALLOCSET_DEFAULT_SIZES);
601 229 : oldcxt = MemoryContextSwitchTo(memcxt);
602 :
603 229 : datums = palloc_array(Datum, list_length(publist));
604 :
605 229 : check_duplicates_in_publist(publist, datums);
606 :
607 225 : MemoryContextSwitchTo(oldcxt);
608 :
609 225 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
610 :
611 225 : MemoryContextDelete(memcxt);
612 :
613 225 : return PointerGetDatum(arr);
614 : }
615 :
616 : /*
617 : * Create new subscription.
618 : */
619 : ObjectAddress
620 308 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
621 : bool isTopLevel)
622 : {
623 : Relation rel;
624 : ObjectAddress myself;
625 : Oid subid;
626 : bool nulls[Natts_pg_subscription];
627 : Datum values[Natts_pg_subscription];
628 308 : Oid owner = GetUserId();
629 : HeapTuple tup;
630 : Oid serverid;
631 : char *conninfo;
632 : char originname[NAMEDATALEN];
633 : List *publications;
634 : uint32 supported_opts;
635 308 : SubOpts opts = {0};
636 : AclResult aclresult;
637 :
638 : /*
639 : * Parse and check options.
640 : *
641 : * Connection and publication should not be specified here.
642 : */
643 308 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
644 : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
645 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
646 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
647 : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
648 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
649 : SUBOPT_RETAIN_DEAD_TUPLES |
650 : SUBOPT_MAX_RETENTION_DURATION |
651 : SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN);
652 308 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
653 :
654 : /*
655 : * Since creating a replication slot is not transactional, rolling back
656 : * the transaction leaves the created replication slot. So we cannot run
657 : * CREATE SUBSCRIPTION inside a transaction block if creating a
658 : * replication slot.
659 : */
660 244 : if (opts.create_slot)
661 138 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
662 :
663 : /*
664 : * We don't want to allow unprivileged users to be able to trigger
665 : * attempts to access arbitrary network destinations, so require the user
666 : * to have been specifically authorized to create subscriptions.
667 : */
668 240 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
669 4 : ereport(ERROR,
670 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
671 : errmsg("permission denied to create subscription"),
672 : errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
673 : "pg_create_subscription")));
674 :
675 : /*
676 : * Since a subscription is a database object, we also check for CREATE
677 : * permission on the database.
678 : */
679 236 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
680 : owner, ACL_CREATE);
681 236 : if (aclresult != ACLCHECK_OK)
682 8 : aclcheck_error(aclresult, OBJECT_DATABASE,
683 4 : get_database_name(MyDatabaseId));
684 :
685 : /*
686 : * Non-superusers are required to set a password for authentication, and
687 : * that password must be used by the target server, but the superuser can
688 : * exempt a subscription from this requirement.
689 : */
690 232 : if (!opts.passwordrequired && !superuser_arg(owner))
691 4 : ereport(ERROR,
692 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
693 : errmsg("password_required=false is superuser-only"),
694 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
695 :
696 : /*
697 : * If built with appropriate switch, whine when regression-testing
698 : * conventions for subscription names are violated.
699 : */
700 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
701 : if (strncmp(stmt->subname, "regress_", 8) != 0)
702 : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
703 : #endif
704 :
705 228 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
706 :
707 : /* Check if name is used */
708 228 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
709 : ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
710 228 : if (OidIsValid(subid))
711 : {
712 5 : ereport(ERROR,
713 : (errcode(ERRCODE_DUPLICATE_OBJECT),
714 : errmsg("subscription \"%s\" already exists",
715 : stmt->subname)));
716 : }
717 :
718 : /*
719 : * Ensure that system configuration parameters are set appropriately to
720 : * support retain_dead_tuples and max_retention_duration.
721 : */
722 223 : CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
723 223 : opts.retaindeadtuples, opts.retaindeadtuples,
724 223 : (opts.maxretention > 0));
725 :
726 223 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
727 181 : opts.slot_name == NULL)
728 181 : opts.slot_name = stmt->subname;
729 :
730 : /* The default for synchronous_commit of subscriptions is off. */
731 223 : if (opts.synchronous_commit == NULL)
732 223 : opts.synchronous_commit = "off";
733 :
734 : /*
735 : * The default for wal_receiver_timeout of subscriptions is -1, which
736 : * means the value is inherited from the server configuration, command
737 : * line, or role/database settings.
738 : */
739 223 : if (opts.wal_receiver_timeout == NULL)
740 223 : opts.wal_receiver_timeout = "-1";
741 :
742 : /* Load the library providing us libpq calls. */
743 223 : load_file("libpqwalreceiver", false);
744 :
745 223 : if (stmt->servername)
746 : {
747 : ForeignServer *server;
748 :
749 : Assert(!stmt->conninfo);
750 18 : conninfo = NULL;
751 :
752 18 : server = GetForeignServerByName(stmt->servername, false);
753 18 : aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
754 18 : if (aclresult != ACLCHECK_OK)
755 4 : aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
756 :
757 : /* make sure a user mapping exists */
758 14 : GetUserMapping(owner, server->serverid);
759 :
760 10 : serverid = server->serverid;
761 10 : conninfo = ForeignServerConnectionString(owner, server);
762 : }
763 : else
764 : {
765 : Assert(stmt->conninfo);
766 :
767 205 : serverid = InvalidOid;
768 205 : conninfo = stmt->conninfo;
769 : }
770 :
771 : /* Check the connection info string. */
772 211 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
773 :
774 199 : publications = stmt->publication;
775 :
776 : /* Everything ok, form a new tuple. */
777 199 : memset(values, 0, sizeof(values));
778 199 : memset(nulls, false, sizeof(nulls));
779 :
780 199 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
781 : Anum_pg_subscription_oid);
782 199 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
783 199 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
784 199 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
785 199 : values[Anum_pg_subscription_subname - 1] =
786 199 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
787 199 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
788 199 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
789 199 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
790 199 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
791 199 : values[Anum_pg_subscription_subtwophasestate - 1] =
792 199 : CharGetDatum(opts.twophase ?
793 : LOGICALREP_TWOPHASE_STATE_PENDING :
794 : LOGICALREP_TWOPHASE_STATE_DISABLED);
795 199 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
796 199 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
797 199 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
798 199 : values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
799 199 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
800 199 : BoolGetDatum(opts.retaindeadtuples);
801 199 : values[Anum_pg_subscription_submaxretention - 1] =
802 199 : Int32GetDatum(opts.maxretention);
803 199 : values[Anum_pg_subscription_subretentionactive - 1] =
804 199 : BoolGetDatum(opts.retaindeadtuples);
805 199 : values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(serverid);
806 199 : if (!OidIsValid(serverid))
807 193 : values[Anum_pg_subscription_subconninfo - 1] =
808 193 : CStringGetTextDatum(conninfo);
809 : else
810 6 : nulls[Anum_pg_subscription_subconninfo - 1] = true;
811 199 : if (opts.slot_name)
812 185 : values[Anum_pg_subscription_subslotname - 1] =
813 185 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
814 : else
815 14 : nulls[Anum_pg_subscription_subslotname - 1] = true;
816 199 : values[Anum_pg_subscription_subsynccommit - 1] =
817 199 : CStringGetTextDatum(opts.synchronous_commit);
818 199 : values[Anum_pg_subscription_subwalrcvtimeout - 1] =
819 199 : CStringGetTextDatum(opts.wal_receiver_timeout);
820 195 : values[Anum_pg_subscription_subpublications - 1] =
821 199 : publicationListToArray(publications);
822 195 : values[Anum_pg_subscription_suborigin - 1] =
823 195 : CStringGetTextDatum(opts.origin);
824 :
825 195 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
826 :
827 : /* Insert tuple into catalog. */
828 195 : CatalogTupleInsert(rel, tup);
829 195 : heap_freetuple(tup);
830 :
831 195 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
832 :
833 195 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
834 :
835 195 : if (stmt->servername)
836 : {
837 : ObjectAddress referenced;
838 :
839 : Assert(OidIsValid(serverid));
840 :
841 6 : ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
842 6 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
843 : }
844 :
845 : /*
846 : * A replication origin is currently created for all subscriptions,
847 : * including those that only contain sequences or are otherwise empty.
848 : *
849 : * XXX: While this is technically unnecessary, optimizing it would require
850 : * additional logic to skip origin creation during DDL operations and
851 : * apply workers initialization, and to handle origin creation dynamically
852 : * when tables are added to the subscription. It is not clear whether
853 : * preventing creation of origins is worth additional complexity.
854 : */
855 195 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
856 195 : replorigin_create(originname);
857 :
858 : /*
859 : * Connect to remote side to execute requested commands and fetch table
860 : * and sequence info.
861 : */
862 195 : if (opts.connect)
863 : {
864 : char *err;
865 : WalReceiverConn *wrconn;
866 : bool must_use_password;
867 :
868 : /* Try to connect to the publisher. */
869 130 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
870 130 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
871 : stmt->subname, &err);
872 130 : if (!wrconn)
873 4 : ereport(ERROR,
874 : (errcode(ERRCODE_CONNECTION_FAILURE),
875 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
876 : stmt->subname, err)));
877 :
878 126 : PG_TRY();
879 : {
880 126 : bool has_tables = false;
881 : List *pubrels;
882 : char relation_state;
883 :
884 126 : check_publications(wrconn, publications);
885 126 : check_publications_origin_tables(wrconn, publications,
886 126 : opts.copy_data,
887 126 : opts.retaindeadtuples, opts.origin,
888 : NULL, 0, stmt->subname);
889 126 : check_publications_origin_sequences(wrconn, publications,
890 126 : opts.copy_data, opts.origin,
891 : NULL, 0, stmt->subname);
892 :
893 126 : if (opts.retaindeadtuples)
894 3 : check_pub_dead_tuple_retention(wrconn);
895 :
896 : /*
897 : * Set sync state based on if we were asked to do data copy or
898 : * not.
899 : */
900 126 : relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
901 :
902 : /*
903 : * Build local relation status info. Relations are for both tables
904 : * and sequences from the publisher.
905 : */
906 126 : pubrels = fetch_relation_list(wrconn, publications);
907 :
908 445 : foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
909 : {
910 : Oid relid;
911 : char relkind;
912 195 : RangeVar *rv = pubrelinfo->rv;
913 :
914 195 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
915 195 : relkind = get_rel_relkind(relid);
916 :
917 : /* Check for supported relkind. */
918 195 : CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
919 195 : rv->schemaname, rv->relname);
920 195 : has_tables |= (relkind != RELKIND_SEQUENCE);
921 195 : AddSubscriptionRelState(subid, relid, relation_state,
922 : InvalidXLogRecPtr, true);
923 : }
924 :
925 : /*
926 : * If requested, create permanent slot for the subscription. We
927 : * won't use the initial snapshot for anything, so no need to
928 : * export it.
929 : *
930 : * XXX: Similar to origins, it is not clear whether preventing the
931 : * slot creation for empty and sequence-only subscriptions is
932 : * worth additional complexity.
933 : */
934 125 : if (opts.create_slot)
935 : {
936 120 : bool twophase_enabled = false;
937 :
938 : Assert(opts.slot_name);
939 :
940 : /*
941 : * Even if two_phase is set, don't create the slot with
942 : * two-phase enabled. Will enable it once all the tables are
943 : * synced and ready. This avoids race-conditions like prepared
944 : * transactions being skipped due to changes not being applied
945 : * due to checks in should_apply_changes_for_rel() when
946 : * tablesync for the corresponding tables are in progress. See
947 : * comments atop worker.c.
948 : *
949 : * Note that if tables were specified but copy_data is false
950 : * then it is safe to enable two_phase up-front because those
951 : * tables are already initially in READY state. When the
952 : * subscription has no tables, we leave the twophase state as
953 : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
954 : * PUBLICATION to work.
955 : */
956 120 : if (opts.twophase && !opts.copy_data && has_tables)
957 1 : twophase_enabled = true;
958 :
959 120 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
960 : opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
961 :
962 120 : if (twophase_enabled)
963 1 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
964 :
965 120 : ereport(NOTICE,
966 : (errmsg("created replication slot \"%s\" on publisher",
967 : opts.slot_name)));
968 : }
969 : }
970 1 : PG_FINALLY();
971 : {
972 126 : walrcv_disconnect(wrconn);
973 : }
974 126 : PG_END_TRY();
975 : }
976 : else
977 65 : ereport(WARNING,
978 : (errmsg("subscription was created, but is not connected"),
979 : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
980 :
981 190 : table_close(rel, RowExclusiveLock);
982 :
983 190 : pgstat_create_subscription(subid);
984 :
985 : /*
986 : * Notify the launcher to start the apply worker if the subscription is
987 : * enabled, or to create the conflict detection slot if retain_dead_tuples
988 : * is enabled.
989 : *
990 : * Creating the conflict detection slot is essential even when the
991 : * subscription is not enabled. This ensures that dead tuples are
992 : * retained, which is necessary for accurately identifying the type of
993 : * conflict during replication.
994 : */
995 190 : if (opts.enabled || opts.retaindeadtuples)
996 119 : ApplyLauncherWakeupAtCommit();
997 :
998 190 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
999 :
1000 190 : return myself;
1001 : }
1002 :
1003 : static void
1004 39 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
1005 : List *validate_publications)
1006 : {
1007 : char *err;
1008 39 : List *pubrels = NIL;
1009 : Oid *pubrel_local_oids;
1010 : List *subrel_states;
1011 39 : List *sub_remove_rels = NIL;
1012 : Oid *subrel_local_oids;
1013 : Oid *subseq_local_oids;
1014 : int subrel_count;
1015 : ListCell *lc;
1016 : int off;
1017 39 : int tbl_count = 0;
1018 39 : int seq_count = 0;
1019 39 : Relation rel = NULL;
1020 : typedef struct SubRemoveRels
1021 : {
1022 : Oid relid;
1023 : char state;
1024 : } SubRemoveRels;
1025 :
1026 : WalReceiverConn *wrconn;
1027 : bool must_use_password;
1028 :
1029 : /* Load the library providing us libpq calls. */
1030 39 : load_file("libpqwalreceiver", false);
1031 :
1032 : /* Try to connect to the publisher. */
1033 39 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1034 39 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1035 : sub->name, &err);
1036 38 : if (!wrconn)
1037 0 : ereport(ERROR,
1038 : (errcode(ERRCODE_CONNECTION_FAILURE),
1039 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1040 : sub->name, err)));
1041 :
1042 38 : PG_TRY();
1043 : {
1044 38 : if (validate_publications)
1045 9 : check_publications(wrconn, validate_publications);
1046 :
1047 : /* Get the relation list from publisher. */
1048 38 : pubrels = fetch_relation_list(wrconn, sub->publications);
1049 :
1050 : /* Get local relation list. */
1051 38 : subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
1052 38 : subrel_count = list_length(subrel_states);
1053 :
1054 : /*
1055 : * Build qsorted arrays of local table oids and sequence oids for
1056 : * faster lookup. This can potentially contain all tables and
1057 : * sequences in the database so speed of lookup is important.
1058 : *
1059 : * We do not yet know the exact count of tables and sequences, so we
1060 : * allocate separate arrays for table OIDs and sequence OIDs based on
1061 : * the total number of relations (subrel_count).
1062 : */
1063 38 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
1064 38 : subseq_local_oids = palloc(subrel_count * sizeof(Oid));
1065 136 : foreach(lc, subrel_states)
1066 : {
1067 98 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
1068 :
1069 98 : if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
1070 9 : subseq_local_oids[seq_count++] = relstate->relid;
1071 : else
1072 89 : subrel_local_oids[tbl_count++] = relstate->relid;
1073 : }
1074 :
1075 38 : qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
1076 38 : check_publications_origin_tables(wrconn, sub->publications, copy_data,
1077 38 : sub->retaindeadtuples, sub->origin,
1078 : subrel_local_oids, tbl_count,
1079 : sub->name);
1080 :
1081 38 : qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
1082 38 : check_publications_origin_sequences(wrconn, sub->publications,
1083 : copy_data, sub->origin,
1084 : subseq_local_oids, seq_count,
1085 : sub->name);
1086 :
1087 : /*
1088 : * Walk over the remote relations and try to match them to locally
1089 : * known relations. If the relation is not known locally create a new
1090 : * state for it.
1091 : *
1092 : * Also builds array of local oids of remote relations for the next
1093 : * step.
1094 : */
1095 38 : off = 0;
1096 38 : pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
1097 :
1098 183 : foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
1099 : {
1100 107 : RangeVar *rv = pubrelinfo->rv;
1101 : Oid relid;
1102 : char relkind;
1103 :
1104 107 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
1105 107 : relkind = get_rel_relkind(relid);
1106 :
1107 : /* Check for supported relkind. */
1108 107 : CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1109 107 : rv->schemaname, rv->relname);
1110 :
1111 107 : pubrel_local_oids[off++] = relid;
1112 :
1113 107 : if (!bsearch(&relid, subrel_local_oids,
1114 39 : tbl_count, sizeof(Oid), oid_cmp) &&
1115 39 : !bsearch(&relid, subseq_local_oids,
1116 : seq_count, sizeof(Oid), oid_cmp))
1117 : {
1118 30 : AddSubscriptionRelState(sub->oid, relid,
1119 : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
1120 : InvalidXLogRecPtr, true);
1121 30 : ereport(DEBUG1,
1122 : errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
1123 : relkind == RELKIND_SEQUENCE ? "sequence" : "table",
1124 : rv->schemaname, rv->relname, sub->name));
1125 : }
1126 : }
1127 :
1128 : /*
1129 : * Next remove state for tables we should not care about anymore using
1130 : * the data we collected above
1131 : */
1132 38 : qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
1133 :
1134 127 : for (off = 0; off < tbl_count; off++)
1135 : {
1136 89 : Oid relid = subrel_local_oids[off];
1137 :
1138 89 : if (!bsearch(&relid, pubrel_local_oids,
1139 89 : list_length(pubrels), sizeof(Oid), oid_cmp))
1140 : {
1141 : char state;
1142 : XLogRecPtr statelsn;
1143 21 : SubRemoveRels *remove_rel = palloc_object(SubRemoveRels);
1144 :
1145 : /*
1146 : * Lock pg_subscription_rel with AccessExclusiveLock to
1147 : * prevent any race conditions with the apply worker
1148 : * re-launching workers at the same time this code is trying
1149 : * to remove those tables.
1150 : *
1151 : * Even if new worker for this particular rel is restarted it
1152 : * won't be able to make any progress as we hold exclusive
1153 : * lock on pg_subscription_rel till the transaction end. It
1154 : * will simply exit as there is no corresponding rel entry.
1155 : *
1156 : * This locking also ensures that the state of rels won't
1157 : * change till we are done with this refresh operation.
1158 : */
1159 21 : if (!rel)
1160 9 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1161 :
1162 : /* Last known rel state. */
1163 21 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1164 :
1165 21 : RemoveSubscriptionRel(sub->oid, relid);
1166 :
1167 21 : remove_rel->relid = relid;
1168 21 : remove_rel->state = state;
1169 :
1170 21 : sub_remove_rels = lappend(sub_remove_rels, remove_rel);
1171 :
1172 21 : logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
1173 :
1174 : /*
1175 : * For READY state, we would have already dropped the
1176 : * tablesync origin.
1177 : */
1178 21 : if (state != SUBREL_STATE_READY)
1179 : {
1180 : char originname[NAMEDATALEN];
1181 :
1182 : /*
1183 : * Drop the tablesync's origin tracking if exists.
1184 : *
1185 : * It is possible that the origin is not yet created for
1186 : * tablesync worker, this can happen for the states before
1187 : * SUBREL_STATE_DATASYNC. The tablesync worker or apply
1188 : * worker can also concurrently try to drop the origin and
1189 : * by this time the origin might be already removed. For
1190 : * these reasons, passing missing_ok = true.
1191 : */
1192 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1193 : sizeof(originname));
1194 0 : replorigin_drop_by_name(originname, true, false);
1195 : }
1196 :
1197 21 : ereport(DEBUG1,
1198 : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1199 : get_namespace_name(get_rel_namespace(relid)),
1200 : get_rel_name(relid),
1201 : sub->name)));
1202 : }
1203 : }
1204 :
1205 : /*
1206 : * Drop the tablesync slots associated with removed tables. This has
1207 : * to be at the end because otherwise if there is an error while doing
1208 : * the database operations we won't be able to rollback dropped slots.
1209 : */
1210 97 : foreach_ptr(SubRemoveRels, sub_remove_rel, sub_remove_rels)
1211 : {
1212 21 : if (sub_remove_rel->state != SUBREL_STATE_READY &&
1213 0 : sub_remove_rel->state != SUBREL_STATE_SYNCDONE)
1214 : {
1215 0 : char syncslotname[NAMEDATALEN] = {0};
1216 :
1217 : /*
1218 : * For READY/SYNCDONE states we know the tablesync slot has
1219 : * already been dropped by the tablesync worker.
1220 : *
1221 : * For other states, there is no certainty, maybe the slot
1222 : * does not exist yet. Also, if we fail after removing some of
1223 : * the slots, next time, it will again try to drop already
1224 : * dropped slots and fail. For these reasons, we allow
1225 : * missing_ok = true for the drop.
1226 : */
1227 0 : ReplicationSlotNameForTablesync(sub->oid, sub_remove_rel->relid,
1228 : syncslotname, sizeof(syncslotname));
1229 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1230 : }
1231 : }
1232 :
1233 : /*
1234 : * Next remove state for sequences we should not care about anymore
1235 : * using the data we collected above
1236 : */
1237 47 : for (off = 0; off < seq_count; off++)
1238 : {
1239 9 : Oid relid = subseq_local_oids[off];
1240 :
1241 9 : if (!bsearch(&relid, pubrel_local_oids,
1242 9 : list_length(pubrels), sizeof(Oid), oid_cmp))
1243 : {
1244 : /*
1245 : * This locking ensures that the state of rels won't change
1246 : * till we are done with this refresh operation.
1247 : */
1248 0 : if (!rel)
1249 0 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1250 :
1251 0 : RemoveSubscriptionRel(sub->oid, relid);
1252 :
1253 0 : ereport(DEBUG1,
1254 : errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
1255 : get_namespace_name(get_rel_namespace(relid)),
1256 : get_rel_name(relid),
1257 : sub->name));
1258 : }
1259 : }
1260 : }
1261 0 : PG_FINALLY();
1262 : {
1263 38 : walrcv_disconnect(wrconn);
1264 : }
1265 38 : PG_END_TRY();
1266 :
1267 38 : if (rel)
1268 9 : table_close(rel, NoLock);
1269 38 : }
1270 :
1271 : /*
1272 : * Marks all sequences with INIT state.
1273 : */
1274 : static void
1275 2 : AlterSubscription_refresh_seq(Subscription *sub)
1276 : {
1277 2 : char *err = NULL;
1278 : WalReceiverConn *wrconn;
1279 : bool must_use_password;
1280 :
1281 : /* Load the library providing us libpq calls. */
1282 2 : load_file("libpqwalreceiver", false);
1283 :
1284 : /* Try to connect to the publisher. */
1285 2 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1286 2 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1287 : sub->name, &err);
1288 2 : if (!wrconn)
1289 0 : ereport(ERROR,
1290 : errcode(ERRCODE_CONNECTION_FAILURE),
1291 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1292 : sub->name, err));
1293 :
1294 2 : PG_TRY();
1295 : {
1296 : List *subrel_states;
1297 :
1298 2 : check_publications_origin_sequences(wrconn, sub->publications, true,
1299 : sub->origin, NULL, 0, sub->name);
1300 :
1301 : /* Get local sequence list. */
1302 2 : subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
1303 12 : foreach_ptr(SubscriptionRelState, subrel, subrel_states)
1304 : {
1305 8 : Oid relid = subrel->relid;
1306 :
1307 8 : UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
1308 : InvalidXLogRecPtr, false);
1309 8 : ereport(DEBUG1,
1310 : errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1311 : get_namespace_name(get_rel_namespace(relid)),
1312 : get_rel_name(relid),
1313 : sub->name));
1314 : }
1315 : }
1316 0 : PG_FINALLY();
1317 : {
1318 2 : walrcv_disconnect(wrconn);
1319 : }
1320 2 : PG_END_TRY();
1321 2 : }
1322 :
1323 : /*
1324 : * Common checks for altering failover, two_phase, and retain_dead_tuples
1325 : * options.
1326 : */
1327 : static void
1328 14 : CheckAlterSubOption(Subscription *sub, const char *option,
1329 : bool slot_needs_update, bool isTopLevel)
1330 : {
1331 : Assert(strcmp(option, "failover") == 0 ||
1332 : strcmp(option, "two_phase") == 0 ||
1333 : strcmp(option, "retain_dead_tuples") == 0);
1334 :
1335 : /*
1336 : * Altering the retain_dead_tuples option does not update the slot on the
1337 : * publisher.
1338 : */
1339 : Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1340 :
1341 : /*
1342 : * Do not allow changing the option if the subscription is enabled. This
1343 : * is because both failover and two_phase options of the slot on the
1344 : * publisher cannot be modified if the slot is currently acquired by the
1345 : * existing walsender.
1346 : *
1347 : * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1348 : * the publisher by the existing walsender, so we could have allowed that
1349 : * even when the subscription is enabled. But we kept this restriction for
1350 : * the sake of consistency and simplicity.
1351 : *
1352 : * Additionally, do not allow changing the retain_dead_tuples option when
1353 : * the subscription is enabled to prevent race conditions arising from the
1354 : * new option value being acknowledged asynchronously by the launcher and
1355 : * apply workers.
1356 : *
1357 : * Without the restriction, a race condition may arise when a user
1358 : * disables and immediately re-enables the retain_dead_tuples option. In
1359 : * this case, the launcher might drop the slot upon noticing the disabled
1360 : * action, while the apply worker may keep maintaining
1361 : * oldest_nonremovable_xid without noticing the option change. During this
1362 : * period, a transaction ID wraparound could falsely make this ID appear
1363 : * as if it originates from the future w.r.t the transaction ID stored in
1364 : * the slot maintained by launcher.
1365 : *
1366 : * Similarly, if the user enables retain_dead_tuples concurrently with the
1367 : * launcher starting the worker, the apply worker may start calculating
1368 : * oldest_nonremovable_xid before the launcher notices the enable action.
1369 : * Consequently, the launcher may update slot.xmin to a newer value than
1370 : * that maintained by the worker. In subsequent cycles, upon integrating
1371 : * the worker's oldest_nonremovable_xid, the launcher might detect a
1372 : * retreat in the calculated xmin, necessitating additional handling.
1373 : *
1374 : * XXX To address the above race conditions, we can define
1375 : * oldest_nonremovable_xid as FullTransactionId and adds the check to
1376 : * disallow retreating the conflict slot's xmin. For now, we kept the
1377 : * implementation simple by disallowing change to the retain_dead_tuples,
1378 : * but in the future we can change this after some more analysis.
1379 : *
1380 : * Note that we could restrict only the enabling of retain_dead_tuples to
1381 : * avoid the race conditions described above, but we maintain the
1382 : * restriction for both enable and disable operations for the sake of
1383 : * consistency.
1384 : */
1385 14 : if (sub->enabled)
1386 2 : ereport(ERROR,
1387 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1388 : errmsg("cannot set option \"%s\" for enabled subscription",
1389 : option)));
1390 :
1391 12 : if (slot_needs_update)
1392 : {
1393 : StringInfoData cmd;
1394 :
1395 : /*
1396 : * A valid slot must be associated with the subscription for us to
1397 : * modify any of the slot's properties.
1398 : */
1399 9 : if (!sub->slotname)
1400 0 : ereport(ERROR,
1401 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1402 : errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1403 : option)));
1404 :
1405 : /* The changed option of the slot can't be rolled back. */
1406 9 : initStringInfo(&cmd);
1407 9 : appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1408 :
1409 9 : PreventInTransactionBlock(isTopLevel, cmd.data);
1410 5 : pfree(cmd.data);
1411 : }
1412 8 : }
1413 :
1414 : /*
1415 : * Alter the existing subscription.
1416 : */
1417 : ObjectAddress
1418 341 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1419 : bool isTopLevel)
1420 : {
1421 : Relation rel;
1422 : ObjectAddress myself;
1423 : bool nulls[Natts_pg_subscription];
1424 : bool replaces[Natts_pg_subscription];
1425 : Datum values[Natts_pg_subscription];
1426 : HeapTuple tup;
1427 : Oid subid;
1428 341 : bool update_tuple = false;
1429 341 : bool update_failover = false;
1430 341 : bool update_two_phase = false;
1431 341 : bool check_pub_rdt = false;
1432 : bool retain_dead_tuples;
1433 : int max_retention;
1434 : bool retention_active;
1435 341 : char *new_conninfo = NULL;
1436 : char *origin;
1437 : Subscription *sub;
1438 : Form_pg_subscription form;
1439 : uint32 supported_opts;
1440 341 : SubOpts opts = {0};
1441 :
1442 341 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1443 :
1444 : /* Fetch the existing tuple. */
1445 341 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
1446 : CStringGetDatum(stmt->subname));
1447 :
1448 341 : if (!HeapTupleIsValid(tup))
1449 4 : ereport(ERROR,
1450 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1451 : errmsg("subscription \"%s\" does not exist",
1452 : stmt->subname)));
1453 :
1454 337 : form = (Form_pg_subscription) GETSTRUCT(tup);
1455 337 : subid = form->oid;
1456 :
1457 : /* must be owner */
1458 337 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1459 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1460 0 : stmt->subname);
1461 :
1462 : /*
1463 : * Skip ACL checks on the subscription's foreign server, if any. If
1464 : * changing the server (or replacing it with a raw connection), then the
1465 : * old one will be removed anyway. If changing something unrelated,
1466 : * there's no need to do an additional ACL check here; that will be done
1467 : * by the subscription worker anyway.
1468 : */
1469 337 : sub = GetSubscription(subid, false, false);
1470 :
1471 337 : retain_dead_tuples = sub->retaindeadtuples;
1472 337 : origin = sub->origin;
1473 337 : max_retention = sub->maxretention;
1474 337 : retention_active = sub->retentionactive;
1475 :
1476 : /*
1477 : * Don't allow non-superuser modification of a subscription with
1478 : * password_required=false.
1479 : */
1480 337 : if (!sub->passwordrequired && !superuser())
1481 0 : ereport(ERROR,
1482 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1483 : errmsg("password_required=false is superuser-only"),
1484 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1485 :
1486 : /* Lock the subscription so nobody else can do anything with it. */
1487 337 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1488 :
1489 : /* Form a new tuple. */
1490 337 : memset(values, 0, sizeof(values));
1491 337 : memset(nulls, false, sizeof(nulls));
1492 337 : memset(replaces, false, sizeof(replaces));
1493 :
1494 337 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1495 :
1496 337 : switch (stmt->kind)
1497 : {
1498 156 : case ALTER_SUBSCRIPTION_OPTIONS:
1499 : {
1500 156 : supported_opts = (SUBOPT_SLOT_NAME |
1501 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1502 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
1503 : SUBOPT_DISABLE_ON_ERR |
1504 : SUBOPT_PASSWORD_REQUIRED |
1505 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1506 : SUBOPT_RETAIN_DEAD_TUPLES |
1507 : SUBOPT_MAX_RETENTION_DURATION |
1508 : SUBOPT_WAL_RECEIVER_TIMEOUT |
1509 : SUBOPT_ORIGIN);
1510 :
1511 156 : parse_subscription_options(pstate, stmt->options,
1512 : supported_opts, &opts);
1513 :
1514 136 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1515 : {
1516 : /*
1517 : * The subscription must be disabled to allow slot_name as
1518 : * 'none', otherwise, the apply worker will repeatedly try
1519 : * to stream the data using that slot_name which neither
1520 : * exists on the publisher nor the user will be allowed to
1521 : * create it.
1522 : */
1523 51 : if (sub->enabled && !opts.slot_name)
1524 0 : ereport(ERROR,
1525 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1526 : errmsg("cannot set %s for enabled subscription",
1527 : "slot_name = NONE")));
1528 :
1529 51 : if (opts.slot_name)
1530 4 : values[Anum_pg_subscription_subslotname - 1] =
1531 4 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1532 : else
1533 47 : nulls[Anum_pg_subscription_subslotname - 1] = true;
1534 51 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1535 : }
1536 :
1537 136 : if (opts.synchronous_commit)
1538 : {
1539 4 : values[Anum_pg_subscription_subsynccommit - 1] =
1540 4 : CStringGetTextDatum(opts.synchronous_commit);
1541 4 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1542 : }
1543 :
1544 136 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1545 : {
1546 10 : values[Anum_pg_subscription_subbinary - 1] =
1547 10 : BoolGetDatum(opts.binary);
1548 10 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1549 : }
1550 :
1551 136 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1552 : {
1553 18 : values[Anum_pg_subscription_substream - 1] =
1554 18 : CharGetDatum(opts.streaming);
1555 18 : replaces[Anum_pg_subscription_substream - 1] = true;
1556 : }
1557 :
1558 136 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1559 : {
1560 : values[Anum_pg_subscription_subdisableonerr - 1]
1561 4 : = BoolGetDatum(opts.disableonerr);
1562 : replaces[Anum_pg_subscription_subdisableonerr - 1]
1563 4 : = true;
1564 : }
1565 :
1566 136 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1567 : {
1568 : /* Non-superuser may not disable password_required. */
1569 8 : if (!opts.passwordrequired && !superuser())
1570 0 : ereport(ERROR,
1571 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1572 : errmsg("password_required=false is superuser-only"),
1573 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1574 :
1575 : values[Anum_pg_subscription_subpasswordrequired - 1]
1576 8 : = BoolGetDatum(opts.passwordrequired);
1577 : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1578 8 : = true;
1579 : }
1580 :
1581 136 : if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1582 : {
1583 9 : values[Anum_pg_subscription_subrunasowner - 1] =
1584 9 : BoolGetDatum(opts.runasowner);
1585 9 : replaces[Anum_pg_subscription_subrunasowner - 1] = true;
1586 : }
1587 :
1588 136 : if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1589 : {
1590 : /*
1591 : * We need to update both the slot and the subscription
1592 : * for the two_phase option. We can enable the two_phase
1593 : * option for a slot only once the initial data
1594 : * synchronization is done. This is to avoid missing some
1595 : * data as explained in comments atop worker.c.
1596 : */
1597 3 : update_two_phase = !opts.twophase;
1598 :
1599 3 : CheckAlterSubOption(sub, "two_phase", update_two_phase,
1600 : isTopLevel);
1601 :
1602 : /*
1603 : * Modifying the two_phase slot option requires a slot
1604 : * lookup by slot name, so changing the slot name at the
1605 : * same time is not allowed.
1606 : */
1607 3 : if (update_two_phase &&
1608 1 : IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1609 0 : ereport(ERROR,
1610 : (errcode(ERRCODE_SYNTAX_ERROR),
1611 : errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1612 :
1613 : /*
1614 : * Note that workers may still survive even if the
1615 : * subscription has been disabled.
1616 : *
1617 : * Ensure workers have already been exited to avoid
1618 : * getting prepared transactions while we are disabling
1619 : * the two_phase option. Otherwise, the changes of an
1620 : * already prepared transaction can be replicated again
1621 : * along with its corresponding commit, leading to
1622 : * duplicate data or errors.
1623 : */
1624 3 : if (logicalrep_workers_find(subid, true, true))
1625 0 : ereport(ERROR,
1626 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1627 : errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1628 : errhint("Try again after some time.")));
1629 :
1630 : /*
1631 : * two_phase cannot be disabled if there are any
1632 : * uncommitted prepared transactions present otherwise it
1633 : * can lead to duplicate data or errors as explained in
1634 : * the comment above.
1635 : */
1636 3 : if (update_two_phase &&
1637 1 : sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
1638 1 : LookupGXactBySubid(subid))
1639 0 : ereport(ERROR,
1640 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1641 : errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1642 : errhint("Resolve these transactions and try again.")));
1643 :
1644 : /* Change system catalog accordingly */
1645 3 : values[Anum_pg_subscription_subtwophasestate - 1] =
1646 3 : CharGetDatum(opts.twophase ?
1647 : LOGICALREP_TWOPHASE_STATE_PENDING :
1648 : LOGICALREP_TWOPHASE_STATE_DISABLED);
1649 3 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1650 : }
1651 :
1652 136 : if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1653 : {
1654 : /*
1655 : * Similar to the two_phase case above, we need to update
1656 : * the failover option for both the slot and the
1657 : * subscription.
1658 : */
1659 9 : update_failover = true;
1660 :
1661 9 : CheckAlterSubOption(sub, "failover", update_failover,
1662 : isTopLevel);
1663 :
1664 4 : values[Anum_pg_subscription_subfailover - 1] =
1665 4 : BoolGetDatum(opts.failover);
1666 4 : replaces[Anum_pg_subscription_subfailover - 1] = true;
1667 : }
1668 :
1669 131 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1670 : {
1671 2 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
1672 2 : BoolGetDatum(opts.retaindeadtuples);
1673 2 : replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
1674 :
1675 : /*
1676 : * Update the retention status only if there's a change in
1677 : * the retain_dead_tuples option value.
1678 : *
1679 : * Automatically marking retention as active when
1680 : * retain_dead_tuples is enabled may not always be ideal,
1681 : * especially if retention was previously stopped and the
1682 : * user toggles retain_dead_tuples without adjusting the
1683 : * publisher workload. However, this behavior provides a
1684 : * convenient way for users to manually refresh the
1685 : * retention status. Since retention will be stopped again
1686 : * unless the publisher workload is reduced, this approach
1687 : * is acceptable for now.
1688 : */
1689 2 : if (opts.retaindeadtuples != sub->retaindeadtuples)
1690 : {
1691 2 : values[Anum_pg_subscription_subretentionactive - 1] =
1692 2 : BoolGetDatum(opts.retaindeadtuples);
1693 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
1694 :
1695 2 : retention_active = opts.retaindeadtuples;
1696 : }
1697 :
1698 2 : CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1699 :
1700 : /*
1701 : * Workers may continue running even after the
1702 : * subscription has been disabled.
1703 : *
1704 : * To prevent race conditions (as described in
1705 : * CheckAlterSubOption()), ensure that all worker
1706 : * processes have already exited before proceeding.
1707 : */
1708 1 : if (logicalrep_workers_find(subid, true, true))
1709 0 : ereport(ERROR,
1710 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1711 : errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1712 : errhint("Try again after some time.")));
1713 :
1714 : /*
1715 : * Notify the launcher to manage the replication slot for
1716 : * conflict detection. This ensures that replication slot
1717 : * is efficiently handled (created, updated, or dropped)
1718 : * in response to any configuration changes.
1719 : */
1720 1 : ApplyLauncherWakeupAtCommit();
1721 :
1722 1 : check_pub_rdt = opts.retaindeadtuples;
1723 1 : retain_dead_tuples = opts.retaindeadtuples;
1724 : }
1725 :
1726 130 : if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1727 : {
1728 6 : values[Anum_pg_subscription_submaxretention - 1] =
1729 6 : Int32GetDatum(opts.maxretention);
1730 6 : replaces[Anum_pg_subscription_submaxretention - 1] = true;
1731 :
1732 6 : max_retention = opts.maxretention;
1733 : }
1734 :
1735 : /*
1736 : * Ensure that system configuration parameters are set
1737 : * appropriately to support retain_dead_tuples and
1738 : * max_retention_duration.
1739 : */
1740 130 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1741 129 : IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1742 7 : CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
1743 : retain_dead_tuples,
1744 : retention_active,
1745 7 : (max_retention > 0));
1746 :
1747 130 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1748 : {
1749 6 : values[Anum_pg_subscription_suborigin - 1] =
1750 6 : CStringGetTextDatum(opts.origin);
1751 6 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1752 :
1753 : /*
1754 : * Check if changes from different origins may be received
1755 : * from the publisher when the origin is changed to ANY
1756 : * and retain_dead_tuples is enabled. Use |= so that we
1757 : * don't clear the flag already set when
1758 : * retain_dead_tuples was changed in the same command.
1759 : */
1760 8 : check_pub_rdt |= retain_dead_tuples &&
1761 2 : pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
1762 :
1763 6 : origin = opts.origin;
1764 : }
1765 :
1766 130 : if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1767 : {
1768 8 : values[Anum_pg_subscription_subwalrcvtimeout - 1] =
1769 8 : CStringGetTextDatum(opts.wal_receiver_timeout);
1770 8 : replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true;
1771 : }
1772 :
1773 130 : update_tuple = true;
1774 130 : break;
1775 : }
1776 :
1777 58 : case ALTER_SUBSCRIPTION_ENABLED:
1778 : {
1779 58 : parse_subscription_options(pstate, stmt->options,
1780 : SUBOPT_ENABLED, &opts);
1781 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1782 :
1783 58 : if (!sub->slotname && opts.enabled)
1784 4 : ereport(ERROR,
1785 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1786 : errmsg("cannot enable subscription that does not have a slot name")));
1787 :
1788 : /*
1789 : * Check track_commit_timestamp only when enabling the
1790 : * subscription in case it was disabled after creation. See
1791 : * comments atop CheckSubDeadTupleRetention() for details.
1792 : */
1793 54 : CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1794 54 : WARNING, sub->retaindeadtuples,
1795 54 : sub->retentionactive, false);
1796 :
1797 54 : values[Anum_pg_subscription_subenabled - 1] =
1798 54 : BoolGetDatum(opts.enabled);
1799 54 : replaces[Anum_pg_subscription_subenabled - 1] = true;
1800 :
1801 54 : if (opts.enabled)
1802 30 : ApplyLauncherWakeupAtCommit();
1803 :
1804 54 : update_tuple = true;
1805 :
1806 : /*
1807 : * The subscription might be initially created with
1808 : * connect=false and retain_dead_tuples=true, meaning the
1809 : * remote server's status may not be checked. Ensure this
1810 : * check is conducted now.
1811 : */
1812 54 : check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1813 54 : break;
1814 : }
1815 :
1816 1 : case ALTER_SUBSCRIPTION_SERVER:
1817 : {
1818 : ForeignServer *new_server;
1819 : ObjectAddress referenced;
1820 : AclResult aclresult;
1821 :
1822 : /*
1823 : * Remove what was there before, either another foreign server
1824 : * or a connection string.
1825 : */
1826 1 : if (form->subserver)
1827 : {
1828 0 : deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
1829 : DEPENDENCY_NORMAL,
1830 : ForeignServerRelationId, form->subserver);
1831 : }
1832 : else
1833 : {
1834 1 : nulls[Anum_pg_subscription_subconninfo - 1] = true;
1835 1 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1836 : }
1837 :
1838 : /*
1839 : * Check that the subscription owner has USAGE privileges on
1840 : * the server.
1841 : */
1842 1 : new_server = GetForeignServerByName(stmt->servername, false);
1843 1 : aclresult = object_aclcheck(ForeignServerRelationId,
1844 : new_server->serverid,
1845 : form->subowner, ACL_USAGE);
1846 1 : if (aclresult != ACLCHECK_OK)
1847 0 : ereport(ERROR,
1848 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1849 : errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
1850 : GetUserNameFromId(form->subowner, false),
1851 : new_server->servername));
1852 :
1853 : /* make sure a user mapping exists */
1854 1 : GetUserMapping(form->subowner, new_server->serverid);
1855 :
1856 1 : new_conninfo = ForeignServerConnectionString(form->subowner,
1857 : new_server);
1858 :
1859 : /* Load the library providing us libpq calls. */
1860 1 : load_file("libpqwalreceiver", false);
1861 : /* Check the connection info string. */
1862 1 : walrcv_check_conninfo(new_conninfo,
1863 : sub->passwordrequired && !sub->ownersuperuser);
1864 :
1865 1 : values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(new_server->serverid);
1866 1 : replaces[Anum_pg_subscription_subserver - 1] = true;
1867 :
1868 1 : ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
1869 1 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1870 :
1871 1 : update_tuple = true;
1872 : }
1873 :
1874 : /*
1875 : * Since the remote server configuration might have changed,
1876 : * perform a check to ensure it permits enabling
1877 : * retain_dead_tuples.
1878 : */
1879 1 : check_pub_rdt = sub->retaindeadtuples;
1880 1 : break;
1881 :
1882 14 : case ALTER_SUBSCRIPTION_CONNECTION:
1883 : /* remove reference to foreign server and dependencies, if present */
1884 14 : if (form->subserver)
1885 : {
1886 1 : deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
1887 : DEPENDENCY_NORMAL,
1888 : ForeignServerRelationId, form->subserver);
1889 :
1890 1 : values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(InvalidOid);
1891 1 : replaces[Anum_pg_subscription_subserver - 1] = true;
1892 : }
1893 :
1894 14 : new_conninfo = stmt->conninfo;
1895 :
1896 : /* Load the library providing us libpq calls. */
1897 14 : load_file("libpqwalreceiver", false);
1898 : /* Check the connection info string. */
1899 14 : walrcv_check_conninfo(new_conninfo,
1900 : sub->passwordrequired && !sub->ownersuperuser);
1901 :
1902 10 : values[Anum_pg_subscription_subconninfo - 1] =
1903 10 : CStringGetTextDatum(stmt->conninfo);
1904 10 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1905 10 : update_tuple = true;
1906 :
1907 : /*
1908 : * Since the remote server configuration might have changed,
1909 : * perform a check to ensure it permits enabling
1910 : * retain_dead_tuples.
1911 : */
1912 10 : check_pub_rdt = sub->retaindeadtuples;
1913 10 : break;
1914 :
1915 19 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
1916 : {
1917 19 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1918 19 : parse_subscription_options(pstate, stmt->options,
1919 : supported_opts, &opts);
1920 :
1921 19 : values[Anum_pg_subscription_subpublications - 1] =
1922 19 : publicationListToArray(stmt->publication);
1923 19 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1924 :
1925 19 : update_tuple = true;
1926 :
1927 : /* Refresh if user asked us to. */
1928 19 : if (opts.refresh)
1929 : {
1930 15 : if (!sub->enabled)
1931 0 : ereport(ERROR,
1932 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1933 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1934 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1935 :
1936 : /*
1937 : * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1938 : * why this is not allowed.
1939 : */
1940 15 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1941 0 : ereport(ERROR,
1942 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1943 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1944 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1945 :
1946 15 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1947 :
1948 : /* Make sure refresh sees the new list of publications. */
1949 7 : sub->publications = stmt->publication;
1950 :
1951 7 : AlterSubscription_refresh(sub, opts.copy_data,
1952 : stmt->publication);
1953 : }
1954 :
1955 11 : break;
1956 : }
1957 :
1958 35 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
1959 : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1960 : {
1961 : List *publist;
1962 35 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1963 :
1964 35 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1965 35 : parse_subscription_options(pstate, stmt->options,
1966 : supported_opts, &opts);
1967 :
1968 35 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1969 11 : values[Anum_pg_subscription_subpublications - 1] =
1970 11 : publicationListToArray(publist);
1971 11 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1972 :
1973 11 : update_tuple = true;
1974 :
1975 : /* Refresh if user asked us to. */
1976 11 : if (opts.refresh)
1977 : {
1978 : /* We only need to validate user specified publications. */
1979 3 : List *validate_publications = (isadd) ? stmt->publication : NULL;
1980 :
1981 3 : if (!sub->enabled)
1982 0 : ereport(ERROR,
1983 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1984 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1985 : /* translator: %s is an SQL ALTER command */
1986 : errhint("Use %s instead.",
1987 : isadd ?
1988 : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1989 : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1990 :
1991 : /*
1992 : * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1993 : * why this is not allowed.
1994 : */
1995 3 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1996 0 : ereport(ERROR,
1997 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1998 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1999 : /* translator: %s is an SQL ALTER command */
2000 : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
2001 : isadd ?
2002 : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
2003 : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
2004 :
2005 3 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
2006 :
2007 : /* Refresh the new list of publications. */
2008 3 : sub->publications = publist;
2009 :
2010 3 : AlterSubscription_refresh(sub, opts.copy_data,
2011 : validate_publications);
2012 : }
2013 :
2014 11 : break;
2015 : }
2016 :
2017 37 : case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
2018 : {
2019 37 : if (!sub->enabled)
2020 4 : ereport(ERROR,
2021 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2022 : errmsg("%s is not allowed for disabled subscriptions",
2023 : "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
2024 :
2025 33 : parse_subscription_options(pstate, stmt->options,
2026 : SUBOPT_COPY_DATA, &opts);
2027 :
2028 : /*
2029 : * The subscription option "two_phase" requires that
2030 : * replication has passed the initial table synchronization
2031 : * phase before the two_phase becomes properly enabled.
2032 : *
2033 : * But, having reached this two-phase commit "enabled" state
2034 : * we must not allow any subsequent table initialization to
2035 : * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
2036 : * disallowed when the user had requested two_phase = on mode.
2037 : *
2038 : * The exception to this restriction is when copy_data =
2039 : * false, because when copy_data is false the tablesync will
2040 : * start already in READY state and will exit directly without
2041 : * doing anything.
2042 : *
2043 : * For more details see comments atop worker.c.
2044 : */
2045 33 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2046 0 : ereport(ERROR,
2047 : (errcode(ERRCODE_SYNTAX_ERROR),
2048 : errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
2049 : errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2050 :
2051 33 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
2052 :
2053 29 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
2054 :
2055 28 : break;
2056 : }
2057 :
2058 2 : case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
2059 : {
2060 2 : if (!sub->enabled)
2061 0 : ereport(ERROR,
2062 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2063 : errmsg("%s is not allowed for disabled subscriptions",
2064 : "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
2065 :
2066 2 : AlterSubscription_refresh_seq(sub);
2067 :
2068 2 : break;
2069 : }
2070 :
2071 15 : case ALTER_SUBSCRIPTION_SKIP:
2072 : {
2073 15 : parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
2074 :
2075 : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
2076 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
2077 :
2078 : /*
2079 : * If the user sets subskiplsn, we do a sanity check to make
2080 : * sure that the specified LSN is a probable value.
2081 : */
2082 11 : if (XLogRecPtrIsValid(opts.lsn))
2083 : {
2084 : ReplOriginId originid;
2085 : char originname[NAMEDATALEN];
2086 : XLogRecPtr remote_lsn;
2087 :
2088 7 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
2089 : originname, sizeof(originname));
2090 7 : originid = replorigin_by_name(originname, false);
2091 7 : remote_lsn = replorigin_get_progress(originid, false);
2092 :
2093 : /* Check the given LSN is at least a future LSN */
2094 7 : if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
2095 0 : ereport(ERROR,
2096 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2097 : errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
2098 : LSN_FORMAT_ARGS(opts.lsn),
2099 : LSN_FORMAT_ARGS(remote_lsn))));
2100 : }
2101 :
2102 11 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
2103 11 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
2104 :
2105 11 : update_tuple = true;
2106 11 : break;
2107 : }
2108 :
2109 0 : default:
2110 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
2111 : stmt->kind);
2112 : }
2113 :
2114 : /* Update the catalog if needed. */
2115 258 : if (update_tuple)
2116 : {
2117 228 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
2118 : replaces);
2119 :
2120 228 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2121 :
2122 228 : heap_freetuple(tup);
2123 : }
2124 :
2125 : /*
2126 : * Try to acquire the connection necessary either for modifying the slot
2127 : * or for checking if the remote server permits enabling
2128 : * retain_dead_tuples.
2129 : *
2130 : * This has to be at the end because otherwise if there is an error while
2131 : * doing the database operations we won't be able to rollback altered
2132 : * slot.
2133 : */
2134 258 : if (update_failover || update_two_phase || check_pub_rdt)
2135 : {
2136 : bool must_use_password;
2137 : char *err;
2138 : WalReceiverConn *wrconn;
2139 :
2140 : /* Load the library providing us libpq calls. */
2141 13 : load_file("libpqwalreceiver", false);
2142 :
2143 : /*
2144 : * Try to connect to the publisher, using the new connection string if
2145 : * available.
2146 : */
2147 13 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
2148 13 : wrconn = walrcv_connect(new_conninfo ? new_conninfo : sub->conninfo,
2149 : true, true, must_use_password, sub->name,
2150 : &err);
2151 13 : if (!wrconn)
2152 0 : ereport(ERROR,
2153 : (errcode(ERRCODE_CONNECTION_FAILURE),
2154 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
2155 : sub->name, err)));
2156 :
2157 13 : PG_TRY();
2158 : {
2159 13 : if (retain_dead_tuples)
2160 9 : check_pub_dead_tuple_retention(wrconn);
2161 :
2162 13 : check_publications_origin_tables(wrconn, sub->publications, false,
2163 : retain_dead_tuples, origin, NULL, 0,
2164 : sub->name);
2165 :
2166 13 : if (update_failover || update_two_phase)
2167 5 : walrcv_alter_slot(wrconn, sub->slotname,
2168 : update_failover ? &opts.failover : NULL,
2169 : update_two_phase ? &opts.twophase : NULL);
2170 : }
2171 0 : PG_FINALLY();
2172 : {
2173 13 : walrcv_disconnect(wrconn);
2174 : }
2175 13 : PG_END_TRY();
2176 : }
2177 :
2178 258 : table_close(rel, RowExclusiveLock);
2179 :
2180 258 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
2181 :
2182 : /* Wake up related replication workers to handle this change quickly. */
2183 258 : LogicalRepWorkersWakeupAtCommit(subid);
2184 :
2185 258 : return myself;
2186 : }
2187 :
2188 : /*
2189 : * Drop a subscription
2190 : */
2191 : void
2192 155 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
2193 : {
2194 : Relation rel;
2195 : ObjectAddress myself;
2196 : HeapTuple tup;
2197 : Oid subid;
2198 : Oid subowner;
2199 : Datum datum;
2200 : bool isnull;
2201 : char *subname;
2202 : char *conninfo;
2203 : char *slotname;
2204 : List *subworkers;
2205 : ListCell *lc;
2206 : char originname[NAMEDATALEN];
2207 155 : char *err = NULL;
2208 155 : WalReceiverConn *wrconn = NULL;
2209 : Form_pg_subscription form;
2210 : List *rstates;
2211 : bool must_use_password;
2212 :
2213 : /*
2214 : * The launcher may concurrently start a new worker for this subscription.
2215 : * During initialization, the worker checks for subscription validity and
2216 : * exits if the subscription has already been dropped. See
2217 : * InitializeLogRepWorker.
2218 : */
2219 155 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2220 :
2221 155 : tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2222 155 : CStringGetDatum(stmt->subname));
2223 :
2224 155 : if (!HeapTupleIsValid(tup))
2225 : {
2226 8 : table_close(rel, NoLock);
2227 :
2228 8 : if (!stmt->missing_ok)
2229 4 : ereport(ERROR,
2230 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2231 : errmsg("subscription \"%s\" does not exist",
2232 : stmt->subname)));
2233 : else
2234 4 : ereport(NOTICE,
2235 : (errmsg("subscription \"%s\" does not exist, skipping",
2236 : stmt->subname)));
2237 :
2238 65 : return;
2239 : }
2240 :
2241 147 : form = (Form_pg_subscription) GETSTRUCT(tup);
2242 147 : subid = form->oid;
2243 147 : subowner = form->subowner;
2244 147 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2245 :
2246 : /* must be owner */
2247 147 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
2248 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
2249 0 : stmt->subname);
2250 :
2251 : /* DROP hook for the subscription being removed */
2252 147 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
2253 :
2254 : /*
2255 : * Lock the subscription so nobody else can do anything with it (including
2256 : * the replication workers).
2257 : */
2258 147 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
2259 :
2260 : /* Get subname */
2261 147 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
2262 : Anum_pg_subscription_subname);
2263 147 : subname = pstrdup(NameStr(*DatumGetName(datum)));
2264 :
2265 : /* Get conninfo */
2266 147 : if (OidIsValid(form->subserver))
2267 : {
2268 : AclResult aclresult;
2269 : ForeignServer *server;
2270 :
2271 9 : server = GetForeignServer(form->subserver);
2272 9 : aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
2273 : form->subowner, ACL_USAGE);
2274 9 : if (aclresult != ACLCHECK_OK)
2275 : {
2276 : /*
2277 : * Unable to generate connection string because permissions on the
2278 : * foreign server have been removed. Follow the same logic as an
2279 : * unusable subconninfo (which will result in an ERROR later
2280 : * unless slot_name = NONE).
2281 : */
2282 8 : err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
2283 : GetUserNameFromId(form->subowner, false),
2284 : server->servername);
2285 8 : conninfo = NULL;
2286 : }
2287 : else
2288 1 : conninfo = ForeignServerConnectionString(form->subowner,
2289 : server);
2290 : }
2291 : else
2292 : {
2293 138 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
2294 : Anum_pg_subscription_subconninfo);
2295 138 : conninfo = TextDatumGetCString(datum);
2296 : }
2297 :
2298 : /* Get slotname */
2299 147 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
2300 : Anum_pg_subscription_subslotname, &isnull);
2301 147 : if (!isnull)
2302 86 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
2303 : else
2304 61 : slotname = NULL;
2305 :
2306 : /*
2307 : * Since dropping a replication slot is not transactional, the replication
2308 : * slot stays dropped even if the transaction rolls back. So we cannot
2309 : * run DROP SUBSCRIPTION inside a transaction block if dropping the
2310 : * replication slot. Also, in this case, we report a message for dropping
2311 : * the subscription to the cumulative stats system.
2312 : *
2313 : * XXX The command name should really be something like "DROP SUBSCRIPTION
2314 : * of a subscription that is associated with a replication slot", but we
2315 : * don't have the proper facilities for that.
2316 : */
2317 147 : if (slotname)
2318 86 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2319 :
2320 143 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
2321 143 : EventTriggerSQLDropAddObject(&myself, true, true);
2322 :
2323 : /* Remove the tuple from catalog. */
2324 143 : CatalogTupleDelete(rel, &tup->t_self);
2325 :
2326 143 : ReleaseSysCache(tup);
2327 :
2328 : /*
2329 : * Stop all the subscription workers immediately.
2330 : *
2331 : * This is necessary if we are dropping the replication slot, so that the
2332 : * slot becomes accessible.
2333 : *
2334 : * It is also necessary if the subscription is disabled and was disabled
2335 : * in the same transaction. Then the workers haven't seen the disabling
2336 : * yet and will still be running, leading to hangs later when we want to
2337 : * drop the replication origin. If the subscription was disabled before
2338 : * this transaction, then there shouldn't be any workers left, so this
2339 : * won't make a difference.
2340 : *
2341 : * New workers won't be started because we hold an exclusive lock on the
2342 : * subscription till the end of the transaction.
2343 : */
2344 143 : subworkers = logicalrep_workers_find(subid, false, true);
2345 224 : foreach(lc, subworkers)
2346 : {
2347 81 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
2348 :
2349 81 : logicalrep_worker_stop(w->type, w->subid, w->relid);
2350 : }
2351 143 : list_free(subworkers);
2352 :
2353 : /*
2354 : * Remove the no-longer-useful entry in the launcher's table of apply
2355 : * worker start times.
2356 : *
2357 : * If this transaction rolls back, the launcher might restart a failed
2358 : * apply worker before wal_retrieve_retry_interval milliseconds have
2359 : * elapsed, but that's pretty harmless.
2360 : */
2361 143 : ApplyLauncherForgetWorkerStartTime(subid);
2362 :
2363 : /*
2364 : * Cleanup of tablesync replication origins.
2365 : *
2366 : * Any READY-state relations would already have dealt with clean-ups.
2367 : *
2368 : * Note that the state can't change because we have already stopped both
2369 : * the apply and tablesync workers and they can't restart because of
2370 : * exclusive lock on the subscription.
2371 : */
2372 143 : rstates = GetSubscriptionRelations(subid, true, false, true);
2373 148 : foreach(lc, rstates)
2374 : {
2375 5 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2376 5 : Oid relid = rstate->relid;
2377 :
2378 : /* Only cleanup resources of tablesync workers */
2379 5 : if (!OidIsValid(relid))
2380 0 : continue;
2381 :
2382 : /*
2383 : * Drop the tablesync's origin tracking if exists.
2384 : *
2385 : * It is possible that the origin is not yet created for tablesync
2386 : * worker so passing missing_ok = true. This can happen for the states
2387 : * before SUBREL_STATE_DATASYNC.
2388 : */
2389 5 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
2390 : sizeof(originname));
2391 5 : replorigin_drop_by_name(originname, true, false);
2392 : }
2393 :
2394 : /* Clean up dependencies */
2395 143 : deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
2396 143 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
2397 :
2398 : /* Remove any associated relation synchronization states. */
2399 143 : RemoveSubscriptionRel(subid, InvalidOid);
2400 :
2401 : /* Remove the origin tracking if exists. */
2402 143 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
2403 143 : replorigin_drop_by_name(originname, true, false);
2404 :
2405 : /*
2406 : * Tell the cumulative stats system that the subscription is getting
2407 : * dropped.
2408 : */
2409 143 : pgstat_drop_subscription(subid);
2410 :
2411 : /*
2412 : * If there is no slot associated with the subscription, we can finish
2413 : * here.
2414 : */
2415 143 : if (!slotname && rstates == NIL)
2416 : {
2417 61 : table_close(rel, NoLock);
2418 61 : return;
2419 : }
2420 :
2421 : /*
2422 : * Try to acquire the connection necessary for dropping slots.
2423 : *
2424 : * Note: If the slotname is NONE/NULL then we allow the command to finish
2425 : * and users need to manually cleanup the apply and tablesync worker slots
2426 : * later.
2427 : *
2428 : * This has to be at the end because otherwise if there is an error while
2429 : * doing the database operations we won't be able to rollback dropped
2430 : * slot.
2431 : */
2432 82 : load_file("libpqwalreceiver", false);
2433 :
2434 82 : if (conninfo)
2435 78 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2436 : subname, &err);
2437 :
2438 82 : if (wrconn == NULL)
2439 : {
2440 4 : if (!slotname)
2441 : {
2442 : /* be tidy */
2443 0 : list_free(rstates);
2444 0 : table_close(rel, NoLock);
2445 0 : return;
2446 : }
2447 : else
2448 : {
2449 4 : ReportSlotConnectionError(rstates, subid, slotname, err);
2450 : }
2451 : }
2452 :
2453 78 : PG_TRY();
2454 : {
2455 83 : foreach(lc, rstates)
2456 : {
2457 5 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2458 5 : Oid relid = rstate->relid;
2459 :
2460 : /* Only cleanup resources of tablesync workers */
2461 5 : if (!OidIsValid(relid))
2462 0 : continue;
2463 :
2464 : /*
2465 : * Drop the tablesync slots associated with removed tables.
2466 : *
2467 : * For SYNCDONE/READY states, the tablesync slot is known to have
2468 : * already been dropped by the tablesync worker.
2469 : *
2470 : * For other states, there is no certainty, maybe the slot does
2471 : * not exist yet. Also, if we fail after removing some of the
2472 : * slots, next time, it will again try to drop already dropped
2473 : * slots and fail. For these reasons, we allow missing_ok = true
2474 : * for the drop.
2475 : */
2476 5 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2477 : {
2478 4 : char syncslotname[NAMEDATALEN] = {0};
2479 :
2480 4 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2481 : sizeof(syncslotname));
2482 4 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
2483 : }
2484 : }
2485 :
2486 78 : list_free(rstates);
2487 :
2488 : /*
2489 : * If there is a slot associated with the subscription, then drop the
2490 : * replication slot at the publisher.
2491 : */
2492 78 : if (slotname)
2493 78 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2494 : }
2495 1 : PG_FINALLY();
2496 : {
2497 78 : walrcv_disconnect(wrconn);
2498 : }
2499 78 : PG_END_TRY();
2500 :
2501 77 : table_close(rel, NoLock);
2502 : }
2503 :
2504 : /*
2505 : * Drop the replication slot at the publisher node using the replication
2506 : * connection.
2507 : *
2508 : * missing_ok - if true then only issue a LOG message if the slot doesn't
2509 : * exist.
2510 : */
2511 : void
2512 290 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2513 : {
2514 : StringInfoData cmd;
2515 :
2516 : Assert(wrconn);
2517 :
2518 290 : load_file("libpqwalreceiver", false);
2519 :
2520 290 : initStringInfo(&cmd);
2521 290 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2522 :
2523 290 : PG_TRY();
2524 : {
2525 : WalRcvExecResult *res;
2526 :
2527 290 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2528 :
2529 290 : if (res->status == WALRCV_OK_COMMAND)
2530 : {
2531 : /* NOTICE. Success. */
2532 289 : ereport(NOTICE,
2533 : (errmsg("dropped replication slot \"%s\" on publisher",
2534 : slotname)));
2535 : }
2536 1 : else if (res->status == WALRCV_ERROR &&
2537 0 : missing_ok &&
2538 0 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
2539 : {
2540 : /* LOG. Error, but missing_ok = true. */
2541 0 : ereport(LOG,
2542 : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2543 : slotname, res->err)));
2544 : }
2545 : else
2546 : {
2547 : /* ERROR. */
2548 1 : ereport(ERROR,
2549 : (errcode(ERRCODE_CONNECTION_FAILURE),
2550 : errmsg("could not drop replication slot \"%s\" on publisher: %s",
2551 : slotname, res->err)));
2552 : }
2553 :
2554 289 : walrcv_clear_result(res);
2555 : }
2556 1 : PG_FINALLY();
2557 : {
2558 290 : pfree(cmd.data);
2559 : }
2560 290 : PG_END_TRY();
2561 289 : }
2562 :
2563 : /*
2564 : * Internal workhorse for changing a subscription owner
2565 : */
2566 : static void
2567 11 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2568 : {
2569 : Form_pg_subscription form;
2570 : AclResult aclresult;
2571 :
2572 11 : form = (Form_pg_subscription) GETSTRUCT(tup);
2573 :
2574 11 : if (form->subowner == newOwnerId)
2575 2 : return;
2576 :
2577 9 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
2578 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
2579 0 : NameStr(form->subname));
2580 :
2581 : /*
2582 : * Don't allow non-superuser modification of a subscription with
2583 : * password_required=false.
2584 : */
2585 9 : if (!form->subpasswordrequired && !superuser())
2586 0 : ereport(ERROR,
2587 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2588 : errmsg("password_required=false is superuser-only"),
2589 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2590 :
2591 : /* Must be able to become new owner */
2592 9 : check_can_set_role(GetUserId(), newOwnerId);
2593 :
2594 : /*
2595 : * current owner must have CREATE on database
2596 : *
2597 : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2598 : * other object types behave differently (e.g. you can't give a table to a
2599 : * user who lacks CREATE privileges on a schema).
2600 : */
2601 5 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
2602 : GetUserId(), ACL_CREATE);
2603 5 : if (aclresult != ACLCHECK_OK)
2604 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2605 0 : get_database_name(MyDatabaseId));
2606 :
2607 : /*
2608 : * If the subscription uses a server, check that the new owner has USAGE
2609 : * privileges on the server and that a user mapping exists. Note: does not
2610 : * re-check the resulting connection string.
2611 : */
2612 5 : if (OidIsValid(form->subserver))
2613 : {
2614 0 : ForeignServer *server = GetForeignServer(form->subserver);
2615 :
2616 0 : aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, newOwnerId, ACL_USAGE);
2617 0 : if (aclresult != ACLCHECK_OK)
2618 0 : ereport(ERROR,
2619 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2620 : errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
2621 : GetUserNameFromId(newOwnerId, false),
2622 : server->servername));
2623 :
2624 : /* make sure a user mapping exists */
2625 0 : GetUserMapping(newOwnerId, server->serverid);
2626 : }
2627 :
2628 5 : form->subowner = newOwnerId;
2629 5 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2630 :
2631 : /* Update owner dependency reference */
2632 5 : changeDependencyOnOwner(SubscriptionRelationId,
2633 : form->oid,
2634 : newOwnerId);
2635 :
2636 5 : InvokeObjectPostAlterHook(SubscriptionRelationId,
2637 : form->oid, 0);
2638 :
2639 : /* Wake up related background processes to handle this change quickly. */
2640 5 : ApplyLauncherWakeupAtCommit();
2641 5 : LogicalRepWorkersWakeupAtCommit(form->oid);
2642 : }
2643 :
2644 : /*
2645 : * Change subscription owner -- by name
2646 : */
2647 : ObjectAddress
2648 11 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
2649 : {
2650 : Oid subid;
2651 : HeapTuple tup;
2652 : Relation rel;
2653 : ObjectAddress address;
2654 : Form_pg_subscription form;
2655 :
2656 11 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2657 :
2658 11 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2659 : CStringGetDatum(name));
2660 :
2661 11 : if (!HeapTupleIsValid(tup))
2662 0 : ereport(ERROR,
2663 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2664 : errmsg("subscription \"%s\" does not exist", name)));
2665 :
2666 11 : form = (Form_pg_subscription) GETSTRUCT(tup);
2667 11 : subid = form->oid;
2668 :
2669 11 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2670 :
2671 7 : ObjectAddressSet(address, SubscriptionRelationId, subid);
2672 :
2673 7 : heap_freetuple(tup);
2674 :
2675 7 : table_close(rel, RowExclusiveLock);
2676 :
2677 7 : return address;
2678 : }
2679 :
2680 : /*
2681 : * Change subscription owner -- by OID
2682 : */
2683 : void
2684 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
2685 : {
2686 : HeapTuple tup;
2687 : Relation rel;
2688 :
2689 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2690 :
2691 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
2692 :
2693 0 : if (!HeapTupleIsValid(tup))
2694 0 : ereport(ERROR,
2695 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2696 : errmsg("subscription with OID %u does not exist", subid)));
2697 :
2698 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2699 :
2700 0 : heap_freetuple(tup);
2701 :
2702 0 : table_close(rel, RowExclusiveLock);
2703 0 : }
2704 :
2705 : /*
2706 : * Check and log a warning if the publisher has subscribed to the same table,
2707 : * its partition ancestors (if it's a partition), or its partition children (if
2708 : * it's a partitioned table), from some other publishers. This check is
2709 : * required in the following scenarios:
2710 : *
2711 : * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2712 : * statements with "copy_data = true" and "origin = none":
2713 : * - Warn the user that data with an origin might have been copied.
2714 : * - This check is skipped for tables already added, as incremental sync via
2715 : * WAL allows origin tracking. The list of such tables is in
2716 : * subrel_local_oids.
2717 : *
2718 : * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2719 : * statements with "retain_dead_tuples = true" and "origin = any", and for
2720 : * ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
2721 : * or when the publisher's status changes (e.g., due to a connection string
2722 : * update):
2723 : * - Warn the user that only conflict detection info for local changes on
2724 : * the publisher is retained. Data from other origins may lack sufficient
2725 : * details for reliable conflict detection.
2726 : * - See comments atop worker.c for more details.
2727 : */
2728 : static void
2729 177 : check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
2730 : bool copydata, bool retain_dead_tuples,
2731 : char *origin, Oid *subrel_local_oids,
2732 : int subrel_count, char *subname)
2733 : {
2734 : WalRcvExecResult *res;
2735 : StringInfoData cmd;
2736 : TupleTableSlot *slot;
2737 177 : Oid tableRow[1] = {TEXTOID};
2738 177 : List *publist = NIL;
2739 : int i;
2740 : bool check_rdt;
2741 : bool check_table_sync;
2742 354 : bool origin_none = origin &&
2743 177 : pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
2744 :
2745 : /*
2746 : * Enable retain_dead_tuples checks only when origin is set to 'any',
2747 : * since with origin='none' only local changes are replicated to the
2748 : * subscriber.
2749 : */
2750 177 : check_rdt = retain_dead_tuples && !origin_none;
2751 :
2752 : /*
2753 : * Enable table synchronization checks only when origin is 'none', to
2754 : * ensure that data from other origins is not inadvertently copied.
2755 : */
2756 177 : check_table_sync = copydata && origin_none;
2757 :
2758 : /* retain_dead_tuples and table sync checks occur separately */
2759 : Assert(!(check_rdt && check_table_sync));
2760 :
2761 : /* Return if no checks are required */
2762 177 : if (!check_rdt && !check_table_sync)
2763 163 : return;
2764 :
2765 14 : initStringInfo(&cmd);
2766 14 : appendStringInfoString(&cmd,
2767 : "SELECT DISTINCT P.pubname AS pubname\n"
2768 : "FROM pg_publication P,\n"
2769 : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2770 : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2771 : " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2772 : " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2773 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2774 : "WHERE C.oid = GPT.relid AND P.pubname IN (");
2775 14 : GetPublicationsStr(publications, &cmd, true);
2776 14 : appendStringInfoString(&cmd, ")\n");
2777 :
2778 : /*
2779 : * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2780 : * subrel_local_oids contains the list of relation oids that are already
2781 : * present on the subscriber. This check should be skipped for these
2782 : * tables if checking for table sync scenario. However, when handling the
2783 : * retain_dead_tuples scenario, ensure all tables are checked, as some
2784 : * existing tables may now include changes from other origins due to newly
2785 : * created subscriptions on the publisher.
2786 : */
2787 14 : if (check_table_sync)
2788 : {
2789 14 : for (i = 0; i < subrel_count; i++)
2790 : {
2791 4 : Oid relid = subrel_local_oids[i];
2792 4 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
2793 4 : char *tablename = get_rel_name(relid);
2794 4 : char *schemaname_lit = quote_literal_cstr(schemaname);
2795 4 : char *tablename_lit = quote_literal_cstr(tablename);
2796 :
2797 4 : appendStringInfo(&cmd, "AND NOT (N.nspname = %s AND C.relname = %s)\n",
2798 : schemaname_lit, tablename_lit);
2799 :
2800 4 : pfree(schemaname_lit);
2801 4 : pfree(tablename_lit);
2802 : }
2803 : }
2804 :
2805 14 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2806 14 : pfree(cmd.data);
2807 :
2808 14 : if (res->status != WALRCV_OK_TUPLES)
2809 0 : ereport(ERROR,
2810 : (errcode(ERRCODE_CONNECTION_FAILURE),
2811 : errmsg("could not receive list of replicated tables from the publisher: %s",
2812 : res->err)));
2813 :
2814 : /* Process publications. */
2815 14 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2816 19 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2817 : {
2818 : char *pubname;
2819 : bool isnull;
2820 :
2821 5 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2822 : Assert(!isnull);
2823 :
2824 5 : ExecClearTuple(slot);
2825 5 : publist = list_append_unique(publist, makeString(pubname));
2826 : }
2827 :
2828 : /*
2829 : * Log a warning if the publisher has subscribed to the same table from
2830 : * some other publisher. We cannot know the origin of data during the
2831 : * initial sync. Data origins can be found only from the WAL by looking at
2832 : * the origin id.
2833 : *
2834 : * XXX: For simplicity, we don't check whether the table has any data or
2835 : * not. If the table doesn't have any data then we don't need to
2836 : * distinguish between data having origin and data not having origin so we
2837 : * can avoid logging a warning for table sync scenario.
2838 : */
2839 14 : if (publist)
2840 : {
2841 : StringInfoData pubnames;
2842 :
2843 : /* Prepare the list of publication(s) for warning message. */
2844 5 : initStringInfo(&pubnames);
2845 5 : GetPublicationsStr(publist, &pubnames, false);
2846 :
2847 5 : if (check_table_sync)
2848 4 : ereport(WARNING,
2849 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2850 : errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2851 : subname),
2852 : errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2853 : "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2854 : list_length(publist), pubnames.data),
2855 : errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2856 : else
2857 1 : ereport(WARNING,
2858 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2859 : errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
2860 : subname),
2861 : errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2862 : "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2863 : list_length(publist), pubnames.data),
2864 : errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
2865 : }
2866 :
2867 14 : ExecDropSingleTupleTableSlot(slot);
2868 :
2869 14 : walrcv_clear_result(res);
2870 : }
2871 :
2872 : /*
2873 : * This function is similar to check_publications_origin_tables and serves
2874 : * same purpose for sequences.
2875 : */
2876 : static void
2877 166 : check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
2878 : bool copydata, char *origin,
2879 : Oid *subrel_local_oids, int subrel_count,
2880 : char *subname)
2881 : {
2882 : WalRcvExecResult *res;
2883 : StringInfoData cmd;
2884 : TupleTableSlot *slot;
2885 166 : Oid tableRow[1] = {TEXTOID};
2886 166 : List *publist = NIL;
2887 :
2888 : /*
2889 : * Enable sequence synchronization checks only when origin is 'none' , to
2890 : * ensure that sequence data from other origins is not inadvertently
2891 : * copied. This check is necessary if the publisher is running PG19 or
2892 : * later, where logical replication sequence synchronization is supported.
2893 : */
2894 176 : if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
2895 10 : walrcv_server_version(wrconn) < 190000)
2896 156 : return;
2897 :
2898 10 : initStringInfo(&cmd);
2899 10 : appendStringInfoString(&cmd,
2900 : "SELECT DISTINCT P.pubname AS pubname\n"
2901 : "FROM pg_publication P,\n"
2902 : " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2903 : " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2904 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2905 : "WHERE C.oid = GPS.relid AND P.pubname IN (");
2906 :
2907 10 : GetPublicationsStr(publications, &cmd, true);
2908 10 : appendStringInfoString(&cmd, ")\n");
2909 :
2910 : /*
2911 : * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2912 : * subrel_local_oids contains the list of relations that are already
2913 : * present on the subscriber. This check should be skipped as these will
2914 : * not be re-synced.
2915 : */
2916 10 : for (int i = 0; i < subrel_count; i++)
2917 : {
2918 0 : Oid relid = subrel_local_oids[i];
2919 0 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
2920 0 : char *seqname = get_rel_name(relid);
2921 0 : char *schemaname_lit = quote_literal_cstr(schemaname);
2922 0 : char *seqname_lit = quote_literal_cstr(seqname);
2923 :
2924 0 : appendStringInfo(&cmd,
2925 : "AND NOT (N.nspname = %s AND C.relname = %s)\n",
2926 : schemaname_lit, seqname_lit);
2927 :
2928 0 : pfree(schemaname_lit);
2929 0 : pfree(seqname_lit);
2930 : }
2931 :
2932 10 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2933 10 : pfree(cmd.data);
2934 :
2935 10 : if (res->status != WALRCV_OK_TUPLES)
2936 0 : ereport(ERROR,
2937 : (errcode(ERRCODE_CONNECTION_FAILURE),
2938 : errmsg("could not receive list of replicated sequences from the publisher: %s",
2939 : res->err)));
2940 :
2941 : /* Process publications. */
2942 10 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2943 10 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2944 : {
2945 : char *pubname;
2946 : bool isnull;
2947 :
2948 0 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2949 : Assert(!isnull);
2950 :
2951 0 : ExecClearTuple(slot);
2952 0 : publist = list_append_unique(publist, makeString(pubname));
2953 : }
2954 :
2955 : /*
2956 : * Log a warning if the publisher has subscribed to the same sequence from
2957 : * some other publisher. We cannot know the origin of sequences data
2958 : * during the initial sync.
2959 : */
2960 10 : if (publist)
2961 : {
2962 : StringInfoData pubnames;
2963 :
2964 : /* Prepare the list of publication(s) for warning message. */
2965 0 : initStringInfo(&pubnames);
2966 0 : GetPublicationsStr(publist, &pubnames, false);
2967 :
2968 0 : ereport(WARNING,
2969 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2970 : errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2971 : subname),
2972 : errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
2973 : "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
2974 : list_length(publist), pubnames.data),
2975 : errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
2976 : }
2977 :
2978 10 : ExecDropSingleTupleTableSlot(slot);
2979 :
2980 10 : walrcv_clear_result(res);
2981 : }
2982 :
2983 : /*
2984 : * Determine whether the retain_dead_tuples can be enabled based on the
2985 : * publisher's status.
2986 : *
2987 : * This option is disallowed if the publisher is running a version earlier
2988 : * than the PG19, or if the publisher is in recovery (i.e., it is a standby
2989 : * server).
2990 : *
2991 : * See comments atop worker.c for a detailed explanation.
2992 : */
2993 : static void
2994 12 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
2995 : {
2996 : WalRcvExecResult *res;
2997 12 : Oid RecoveryRow[1] = {BOOLOID};
2998 : TupleTableSlot *slot;
2999 : bool isnull;
3000 : bool remote_in_recovery;
3001 :
3002 12 : if (walrcv_server_version(wrconn) < 190000)
3003 0 : ereport(ERROR,
3004 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3005 : errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
3006 :
3007 12 : res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
3008 :
3009 12 : if (res->status != WALRCV_OK_TUPLES)
3010 0 : ereport(ERROR,
3011 : (errcode(ERRCODE_CONNECTION_FAILURE),
3012 : errmsg("could not obtain recovery progress from the publisher: %s",
3013 : res->err)));
3014 :
3015 12 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3016 12 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3017 0 : elog(ERROR, "failed to fetch tuple for the recovery progress");
3018 :
3019 12 : remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
3020 :
3021 12 : if (remote_in_recovery)
3022 0 : ereport(ERROR,
3023 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3024 : errmsg("cannot enable retain_dead_tuples if the publisher is in recovery"));
3025 :
3026 12 : ExecDropSingleTupleTableSlot(slot);
3027 :
3028 12 : walrcv_clear_result(res);
3029 12 : }
3030 :
3031 : /*
3032 : * Check if the subscriber's configuration is adequate to enable the
3033 : * retain_dead_tuples option.
3034 : *
3035 : * Issue an ERROR if the wal_level does not support the use of replication
3036 : * slots when check_guc is set to true.
3037 : *
3038 : * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
3039 : * set to true. This is only to highlight the importance of enabling
3040 : * track_commit_timestamp instead of catching all the misconfigurations, as
3041 : * this setting can be adjusted after subscription creation. Without it, the
3042 : * apply worker will simply skip conflict detection.
3043 : *
3044 : * Issue a WARNING or NOTICE if the subscription is disabled and the retention
3045 : * is active. Do not raise an ERROR since users can only modify
3046 : * retain_dead_tuples for disabled subscriptions. And as long as the
3047 : * subscription is enabled promptly, it will not pose issues.
3048 : *
3049 : * Issue a NOTICE to inform users that max_retention_duration is
3050 : * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
3051 : * is not issued because setting max_retention_duration causes no harm,
3052 : * even when it is ineffective.
3053 : */
3054 : void
3055 288 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
3056 : int elevel_for_sub_disabled,
3057 : bool retain_dead_tuples, bool retention_active,
3058 : bool max_retention_set)
3059 : {
3060 : Assert(elevel_for_sub_disabled == NOTICE ||
3061 : elevel_for_sub_disabled == WARNING);
3062 :
3063 288 : if (retain_dead_tuples)
3064 : {
3065 17 : if (check_guc && wal_level < WAL_LEVEL_REPLICA)
3066 0 : ereport(ERROR,
3067 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3068 : errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
3069 : errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
3070 :
3071 17 : if (check_guc && !track_commit_timestamp)
3072 4 : ereport(WARNING,
3073 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3074 : errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
3075 : errhint("Consider setting \"%s\" to true.",
3076 : "track_commit_timestamp"));
3077 :
3078 17 : if (sub_disabled && retention_active)
3079 7 : ereport(elevel_for_sub_disabled,
3080 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3081 : errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
3082 : (elevel_for_sub_disabled > NOTICE)
3083 : ? errhint("Consider setting %s to false.",
3084 : "retain_dead_tuples") : 0);
3085 : }
3086 271 : else if (max_retention_set)
3087 : {
3088 4 : ereport(NOTICE,
3089 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3090 : errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
3091 : }
3092 288 : }
3093 :
3094 : /*
3095 : * Return true iff 'rv' is a member of the list.
3096 : */
3097 : static bool
3098 287 : list_member_rangevar(const List *list, RangeVar *rv)
3099 : {
3100 1044 : foreach_ptr(PublicationRelKind, relinfo, list)
3101 : {
3102 472 : if (equal(relinfo->rv, rv))
3103 1 : return true;
3104 : }
3105 :
3106 286 : return false;
3107 : }
3108 :
3109 : /*
3110 : * Get the list of tables and sequences which belong to specified publications
3111 : * on the publisher connection.
3112 : *
3113 : * Note that we don't support the case where the column list is different for
3114 : * the same table in different publications to avoid sending unwanted column
3115 : * information for some of the rows. This can happen when both the column
3116 : * list and row filter are specified for different publications.
3117 : */
3118 : static List *
3119 164 : fetch_relation_list(WalReceiverConn *wrconn, List *publications)
3120 : {
3121 : WalRcvExecResult *res;
3122 : StringInfoData cmd;
3123 : TupleTableSlot *slot;
3124 164 : Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
3125 164 : List *relationlist = NIL;
3126 164 : int server_version = walrcv_server_version(wrconn);
3127 164 : bool check_columnlist = (server_version >= 150000);
3128 164 : int column_count = check_columnlist ? 4 : 3;
3129 : StringInfoData pub_names;
3130 :
3131 164 : initStringInfo(&cmd);
3132 164 : initStringInfo(&pub_names);
3133 :
3134 : /* Build the pub_names comma-separated string. */
3135 164 : GetPublicationsStr(publications, &pub_names, true);
3136 :
3137 : /* Get the list of relations from the publisher */
3138 164 : if (server_version >= 160000)
3139 : {
3140 164 : tableRow[3] = INT2VECTOROID;
3141 :
3142 : /*
3143 : * From version 16, we allowed passing multiple publications to the
3144 : * function pg_get_publication_tables. This helped to filter out the
3145 : * partition table whose ancestor is also published in this
3146 : * publication array.
3147 : *
3148 : * Join pg_get_publication_tables with pg_publication to exclude
3149 : * non-existing publications.
3150 : *
3151 : * Note that attrs are always stored in sorted order so we don't need
3152 : * to worry if different publications have specified them in a
3153 : * different order. See pub_collist_validate.
3154 : */
3155 164 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
3156 : " FROM pg_class c\n"
3157 : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
3158 : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
3159 : " FROM pg_publication\n"
3160 : " WHERE pubname IN ( %s )) AS gpt\n"
3161 : " ON gpt.relid = c.oid\n",
3162 : pub_names.data);
3163 :
3164 : /* From version 19, inclusion of sequences in the target is supported */
3165 164 : if (server_version >= 190000)
3166 164 : appendStringInfo(&cmd,
3167 : "UNION ALL\n"
3168 : " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
3169 : " FROM pg_catalog.pg_publication_sequences s\n"
3170 : " WHERE s.pubname IN ( %s )",
3171 : pub_names.data);
3172 : }
3173 : else
3174 : {
3175 0 : tableRow[3] = NAMEARRAYOID;
3176 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
3177 :
3178 : /* Get column lists for each relation if the publisher supports it */
3179 0 : if (check_columnlist)
3180 0 : appendStringInfoString(&cmd, ", t.attnames\n");
3181 :
3182 0 : appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
3183 : " WHERE t.pubname IN ( %s )",
3184 : pub_names.data);
3185 : }
3186 :
3187 164 : pfree(pub_names.data);
3188 :
3189 164 : res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
3190 164 : pfree(cmd.data);
3191 :
3192 164 : if (res->status != WALRCV_OK_TUPLES)
3193 0 : ereport(ERROR,
3194 : (errcode(ERRCODE_CONNECTION_FAILURE),
3195 : errmsg("could not receive list of replicated tables from the publisher: %s",
3196 : res->err)));
3197 :
3198 : /* Process tables. */
3199 164 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3200 467 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3201 : {
3202 : char *nspname;
3203 : char *relname;
3204 : bool isnull;
3205 : char relkind;
3206 304 : PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
3207 :
3208 304 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3209 : Assert(!isnull);
3210 304 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
3211 : Assert(!isnull);
3212 304 : relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
3213 : Assert(!isnull);
3214 :
3215 304 : relinfo->rv = makeRangeVar(nspname, relname, -1);
3216 304 : relinfo->relkind = relkind;
3217 :
3218 304 : if (relkind != RELKIND_SEQUENCE &&
3219 287 : check_columnlist &&
3220 287 : list_member_rangevar(relationlist, relinfo->rv))
3221 1 : ereport(ERROR,
3222 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3223 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
3224 : nspname, relname));
3225 : else
3226 303 : relationlist = lappend(relationlist, relinfo);
3227 :
3228 303 : ExecClearTuple(slot);
3229 : }
3230 163 : ExecDropSingleTupleTableSlot(slot);
3231 :
3232 163 : walrcv_clear_result(res);
3233 :
3234 163 : return relationlist;
3235 : }
3236 :
3237 : /*
3238 : * This is to report the connection failure while dropping replication slots.
3239 : * Here, we report the WARNING for all tablesync slots so that user can drop
3240 : * them manually, if required.
3241 : */
3242 : static void
3243 4 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
3244 : {
3245 : ListCell *lc;
3246 :
3247 4 : foreach(lc, rstates)
3248 : {
3249 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
3250 0 : Oid relid = rstate->relid;
3251 :
3252 : /* Only cleanup resources of tablesync workers */
3253 0 : if (!OidIsValid(relid))
3254 0 : continue;
3255 :
3256 : /*
3257 : * Caller needs to ensure that relstate doesn't change underneath us.
3258 : * See DropSubscription where we get the relstates.
3259 : */
3260 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
3261 : {
3262 0 : char syncslotname[NAMEDATALEN] = {0};
3263 :
3264 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
3265 : sizeof(syncslotname));
3266 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
3267 : syncslotname);
3268 : }
3269 : }
3270 :
3271 4 : ereport(ERROR,
3272 : (errcode(ERRCODE_CONNECTION_FAILURE),
3273 : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3274 : slotname, err),
3275 : /* translator: %s is an SQL ALTER command */
3276 : errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3277 : "ALTER SUBSCRIPTION ... DISABLE",
3278 : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3279 : }
3280 :
3281 : /*
3282 : * Check for duplicates in the given list of publications and error out if
3283 : * found one. Add publications to datums as text datums, if datums is not
3284 : * NULL.
3285 : */
3286 : static void
3287 264 : check_duplicates_in_publist(List *publist, Datum *datums)
3288 : {
3289 : ListCell *cell;
3290 264 : int j = 0;
3291 :
3292 602 : foreach(cell, publist)
3293 : {
3294 350 : char *name = strVal(lfirst(cell));
3295 : ListCell *pcell;
3296 :
3297 513 : foreach(pcell, publist)
3298 : {
3299 513 : char *pname = strVal(lfirst(pcell));
3300 :
3301 513 : if (pcell == cell)
3302 338 : break;
3303 :
3304 175 : if (strcmp(name, pname) == 0)
3305 12 : ereport(ERROR,
3306 : (errcode(ERRCODE_DUPLICATE_OBJECT),
3307 : errmsg("publication name \"%s\" used more than once",
3308 : pname)));
3309 : }
3310 :
3311 338 : if (datums)
3312 282 : datums[j++] = CStringGetTextDatum(name);
3313 : }
3314 252 : }
3315 :
3316 : /*
3317 : * Merge current subscription's publications and user-specified publications
3318 : * from ADD/DROP PUBLICATIONS.
3319 : *
3320 : * If addpub is true, we will add the list of publications into oldpublist.
3321 : * Otherwise, we will delete the list of publications from oldpublist. The
3322 : * returned list is a copy, oldpublist itself is not changed.
3323 : *
3324 : * subname is the subscription name, for error messages.
3325 : */
3326 : static List *
3327 35 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
3328 : {
3329 : ListCell *lc;
3330 :
3331 35 : oldpublist = list_copy(oldpublist);
3332 :
3333 35 : check_duplicates_in_publist(newpublist, NULL);
3334 :
3335 59 : foreach(lc, newpublist)
3336 : {
3337 44 : char *name = strVal(lfirst(lc));
3338 : ListCell *lc2;
3339 44 : bool found = false;
3340 :
3341 86 : foreach(lc2, oldpublist)
3342 : {
3343 71 : char *pubname = strVal(lfirst(lc2));
3344 :
3345 71 : if (strcmp(name, pubname) == 0)
3346 : {
3347 29 : found = true;
3348 29 : if (addpub)
3349 8 : ereport(ERROR,
3350 : (errcode(ERRCODE_DUPLICATE_OBJECT),
3351 : errmsg("publication \"%s\" is already in subscription \"%s\"",
3352 : name, subname)));
3353 : else
3354 21 : oldpublist = foreach_delete_current(oldpublist, lc2);
3355 :
3356 21 : break;
3357 : }
3358 : }
3359 :
3360 36 : if (addpub && !found)
3361 11 : oldpublist = lappend(oldpublist, makeString(name));
3362 25 : else if (!addpub && !found)
3363 4 : ereport(ERROR,
3364 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3365 : errmsg("publication \"%s\" is not in subscription \"%s\"",
3366 : name, subname)));
3367 : }
3368 :
3369 : /*
3370 : * XXX Probably no strong reason for this, but for now it's to make ALTER
3371 : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
3372 : */
3373 15 : if (!oldpublist)
3374 4 : ereport(ERROR,
3375 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3376 : errmsg("cannot drop all the publications from a subscription")));
3377 :
3378 11 : return oldpublist;
3379 : }
3380 :
3381 : /*
3382 : * Extract the streaming mode value from a DefElem. This is like
3383 : * defGetBoolean() but also accepts the special value of "parallel".
3384 : */
3385 : char
3386 483 : defGetStreamingMode(DefElem *def)
3387 : {
3388 : /*
3389 : * If no parameter value given, assume "true" is meant.
3390 : */
3391 483 : if (!def->arg)
3392 0 : return LOGICALREP_STREAM_ON;
3393 :
3394 : /*
3395 : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3396 : */
3397 483 : switch (nodeTag(def->arg))
3398 : {
3399 0 : case T_Integer:
3400 0 : switch (intVal(def->arg))
3401 : {
3402 0 : case 0:
3403 0 : return LOGICALREP_STREAM_OFF;
3404 0 : case 1:
3405 0 : return LOGICALREP_STREAM_ON;
3406 0 : default:
3407 : /* otherwise, error out below */
3408 0 : break;
3409 : }
3410 0 : break;
3411 483 : default:
3412 : {
3413 483 : char *sval = defGetString(def);
3414 :
3415 : /*
3416 : * The set of strings accepted here should match up with the
3417 : * grammar's opt_boolean_or_string production.
3418 : */
3419 962 : if (pg_strcasecmp(sval, "false") == 0 ||
3420 479 : pg_strcasecmp(sval, "off") == 0)
3421 7 : return LOGICALREP_STREAM_OFF;
3422 940 : if (pg_strcasecmp(sval, "true") == 0 ||
3423 464 : pg_strcasecmp(sval, "on") == 0)
3424 49 : return LOGICALREP_STREAM_ON;
3425 427 : if (pg_strcasecmp(sval, "parallel") == 0)
3426 423 : return LOGICALREP_STREAM_PARALLEL;
3427 : }
3428 4 : break;
3429 : }
3430 :
3431 4 : ereport(ERROR,
3432 : (errcode(ERRCODE_SYNTAX_ERROR),
3433 : errmsg("%s requires a Boolean value or \"parallel\"",
3434 : def->defname)));
3435 : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3436 : }
|