Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * subscriptioncmds.c
4 : * subscription catalog manipulation functions
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/subscriptioncmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/htup_details.h"
19 : #include "access/table.h"
20 : #include "access/twophase.h"
21 : #include "access/xact.h"
22 : #include "catalog/catalog.h"
23 : #include "catalog/dependency.h"
24 : #include "catalog/indexing.h"
25 : #include "catalog/namespace.h"
26 : #include "catalog/objectaccess.h"
27 : #include "catalog/objectaddress.h"
28 : #include "catalog/pg_authid_d.h"
29 : #include "catalog/pg_database_d.h"
30 : #include "catalog/pg_subscription.h"
31 : #include "catalog/pg_subscription_rel.h"
32 : #include "catalog/pg_type.h"
33 : #include "commands/defrem.h"
34 : #include "commands/event_trigger.h"
35 : #include "commands/subscriptioncmds.h"
36 : #include "executor/executor.h"
37 : #include "miscadmin.h"
38 : #include "nodes/makefuncs.h"
39 : #include "pgstat.h"
40 : #include "replication/logicallauncher.h"
41 : #include "replication/logicalworker.h"
42 : #include "replication/origin.h"
43 : #include "replication/slot.h"
44 : #include "replication/walreceiver.h"
45 : #include "replication/walsender.h"
46 : #include "replication/worker_internal.h"
47 : #include "storage/lmgr.h"
48 : #include "utils/acl.h"
49 : #include "utils/builtins.h"
50 : #include "utils/guc.h"
51 : #include "utils/lsyscache.h"
52 : #include "utils/memutils.h"
53 : #include "utils/pg_lsn.h"
54 : #include "utils/syscache.h"
55 :
56 : /*
57 : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
58 : * command.
59 : */
60 : #define SUBOPT_CONNECT 0x00000001
61 : #define SUBOPT_ENABLED 0x00000002
62 : #define SUBOPT_CREATE_SLOT 0x00000004
63 : #define SUBOPT_SLOT_NAME 0x00000008
64 : #define SUBOPT_COPY_DATA 0x00000010
65 : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
66 : #define SUBOPT_REFRESH 0x00000040
67 : #define SUBOPT_BINARY 0x00000080
68 : #define SUBOPT_STREAMING 0x00000100
69 : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
70 : #define SUBOPT_DISABLE_ON_ERR 0x00000400
71 : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
72 : #define SUBOPT_RUN_AS_OWNER 0x00001000
73 : #define SUBOPT_FAILOVER 0x00002000
74 : #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
75 : #define SUBOPT_MAX_RETENTION_DURATION 0x00008000
76 : #define SUBOPT_LSN 0x00010000
77 : #define SUBOPT_ORIGIN 0x00020000
78 :
79 : /* check if the 'val' has 'bits' set */
80 : #define IsSet(val, bits) (((val) & (bits)) == (bits))
81 :
82 : /*
83 : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
84 : * SUBSCRIPTION command options and the parsed/default values of each of them.
85 : */
86 : typedef struct SubOpts
87 : {
88 : bits32 specified_opts;
89 : char *slot_name;
90 : char *synchronous_commit;
91 : bool connect;
92 : bool enabled;
93 : bool create_slot;
94 : bool copy_data;
95 : bool refresh;
96 : bool binary;
97 : char streaming;
98 : bool twophase;
99 : bool disableonerr;
100 : bool passwordrequired;
101 : bool runasowner;
102 : bool failover;
103 : bool retaindeadtuples;
104 : int32 maxretention;
105 : char *origin;
106 : XLogRecPtr lsn;
107 : } SubOpts;
108 :
109 : /*
110 : * PublicationRelKind represents a relation included in a publication.
111 : * It stores the schema-qualified relation name (rv) and its kind (relkind).
112 : */
113 : typedef struct PublicationRelKind
114 : {
115 : RangeVar *rv;
116 : char relkind;
117 : } PublicationRelKind;
118 :
119 : static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
120 : static void check_publications_origin_tables(WalReceiverConn *wrconn,
121 : List *publications, bool copydata,
122 : bool retain_dead_tuples,
123 : char *origin,
124 : Oid *subrel_local_oids,
125 : int subrel_count, char *subname);
126 : static void check_publications_origin_sequences(WalReceiverConn *wrconn,
127 : List *publications,
128 : bool copydata, char *origin,
129 : Oid *subrel_local_oids,
130 : int subrel_count,
131 : char *subname);
132 : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
133 : static void check_duplicates_in_publist(List *publist, Datum *datums);
134 : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
135 : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
136 : static void CheckAlterSubOption(Subscription *sub, const char *option,
137 : bool slot_needs_update, bool isTopLevel);
138 :
139 :
140 : /*
141 : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
142 : *
143 : * Since not all options can be specified in both commands, this function
144 : * will report an error if mutually exclusive options are specified.
145 : */
146 : static void
147 970 : parse_subscription_options(ParseState *pstate, List *stmt_options,
148 : bits32 supported_opts, SubOpts *opts)
149 : {
150 : ListCell *lc;
151 :
152 : /* Start out with cleared opts. */
153 970 : memset(opts, 0, sizeof(SubOpts));
154 :
155 : /* caller must expect some option */
156 : Assert(supported_opts != 0);
157 :
158 : /* If connect option is supported, these others also need to be. */
159 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
160 : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
161 : SUBOPT_COPY_DATA));
162 :
163 : /* Set default values for the supported options. */
164 970 : if (IsSet(supported_opts, SUBOPT_CONNECT))
165 482 : opts->connect = true;
166 970 : if (IsSet(supported_opts, SUBOPT_ENABLED))
167 586 : opts->enabled = true;
168 970 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
169 482 : opts->create_slot = true;
170 970 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
171 624 : opts->copy_data = true;
172 970 : if (IsSet(supported_opts, SUBOPT_REFRESH))
173 86 : opts->refresh = true;
174 970 : if (IsSet(supported_opts, SUBOPT_BINARY))
175 700 : opts->binary = false;
176 970 : if (IsSet(supported_opts, SUBOPT_STREAMING))
177 700 : opts->streaming = LOGICALREP_STREAM_PARALLEL;
178 970 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
179 700 : opts->twophase = false;
180 970 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
181 700 : opts->disableonerr = false;
182 970 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
183 700 : opts->passwordrequired = true;
184 970 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
185 700 : opts->runasowner = false;
186 970 : if (IsSet(supported_opts, SUBOPT_FAILOVER))
187 700 : opts->failover = false;
188 970 : if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
189 700 : opts->retaindeadtuples = false;
190 970 : if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
191 700 : opts->maxretention = 0;
192 970 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
193 700 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
194 :
195 : /* Parse options */
196 1920 : foreach(lc, stmt_options)
197 : {
198 1016 : DefElem *defel = (DefElem *) lfirst(lc);
199 :
200 1016 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
201 592 : strcmp(defel->defname, "connect") == 0)
202 : {
203 190 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
204 0 : errorConflictingDefElem(defel, pstate);
205 :
206 190 : opts->specified_opts |= SUBOPT_CONNECT;
207 190 : opts->connect = defGetBoolean(defel);
208 : }
209 826 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
210 506 : strcmp(defel->defname, "enabled") == 0)
211 : {
212 142 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
213 0 : errorConflictingDefElem(defel, pstate);
214 :
215 142 : opts->specified_opts |= SUBOPT_ENABLED;
216 142 : opts->enabled = defGetBoolean(defel);
217 : }
218 684 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
219 364 : strcmp(defel->defname, "create_slot") == 0)
220 : {
221 40 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
222 0 : errorConflictingDefElem(defel, pstate);
223 :
224 40 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
225 40 : opts->create_slot = defGetBoolean(defel);
226 : }
227 644 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
228 546 : strcmp(defel->defname, "slot_name") == 0)
229 : {
230 156 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
231 0 : errorConflictingDefElem(defel, pstate);
232 :
233 156 : opts->specified_opts |= SUBOPT_SLOT_NAME;
234 156 : opts->slot_name = defGetString(defel);
235 :
236 : /* Setting slot_name = NONE is treated as no slot name. */
237 156 : if (strcmp(opts->slot_name, "none") == 0)
238 122 : opts->slot_name = NULL;
239 : else
240 34 : ReplicationSlotValidateName(opts->slot_name, false, ERROR);
241 : }
242 488 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
243 320 : strcmp(defel->defname, "copy_data") == 0)
244 : {
245 56 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
246 0 : errorConflictingDefElem(defel, pstate);
247 :
248 56 : opts->specified_opts |= SUBOPT_COPY_DATA;
249 56 : opts->copy_data = defGetBoolean(defel);
250 : }
251 432 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
252 342 : strcmp(defel->defname, "synchronous_commit") == 0)
253 : {
254 12 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
255 0 : errorConflictingDefElem(defel, pstate);
256 :
257 12 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
258 12 : opts->synchronous_commit = defGetString(defel);
259 :
260 : /* Test if the given value is valid for synchronous_commit GUC. */
261 12 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
262 : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
263 : false, 0, false);
264 : }
265 420 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
266 66 : strcmp(defel->defname, "refresh") == 0)
267 : {
268 66 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
269 0 : errorConflictingDefElem(defel, pstate);
270 :
271 66 : opts->specified_opts |= SUBOPT_REFRESH;
272 66 : opts->refresh = defGetBoolean(defel);
273 : }
274 354 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
275 330 : strcmp(defel->defname, "binary") == 0)
276 : {
277 32 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
278 0 : errorConflictingDefElem(defel, pstate);
279 :
280 32 : opts->specified_opts |= SUBOPT_BINARY;
281 32 : opts->binary = defGetBoolean(defel);
282 : }
283 322 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
284 298 : strcmp(defel->defname, "streaming") == 0)
285 : {
286 74 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
287 0 : errorConflictingDefElem(defel, pstate);
288 :
289 74 : opts->specified_opts |= SUBOPT_STREAMING;
290 74 : opts->streaming = defGetStreamingMode(defel);
291 : }
292 248 : else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
293 224 : strcmp(defel->defname, "two_phase") == 0)
294 : {
295 42 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
296 0 : errorConflictingDefElem(defel, pstate);
297 :
298 42 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
299 42 : opts->twophase = defGetBoolean(defel);
300 : }
301 206 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
302 182 : strcmp(defel->defname, "disable_on_error") == 0)
303 : {
304 20 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
305 0 : errorConflictingDefElem(defel, pstate);
306 :
307 20 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
308 20 : opts->disableonerr = defGetBoolean(defel);
309 : }
310 186 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
311 162 : strcmp(defel->defname, "password_required") == 0)
312 : {
313 24 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
314 0 : errorConflictingDefElem(defel, pstate);
315 :
316 24 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
317 24 : opts->passwordrequired = defGetBoolean(defel);
318 : }
319 162 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
320 138 : strcmp(defel->defname, "run_as_owner") == 0)
321 : {
322 18 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
323 0 : errorConflictingDefElem(defel, pstate);
324 :
325 18 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
326 18 : opts->runasowner = defGetBoolean(defel);
327 : }
328 144 : else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
329 120 : strcmp(defel->defname, "failover") == 0)
330 : {
331 26 : if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
332 0 : errorConflictingDefElem(defel, pstate);
333 :
334 26 : opts->specified_opts |= SUBOPT_FAILOVER;
335 26 : opts->failover = defGetBoolean(defel);
336 : }
337 118 : else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
338 94 : strcmp(defel->defname, "retain_dead_tuples") == 0)
339 : {
340 24 : if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
341 0 : errorConflictingDefElem(defel, pstate);
342 :
343 24 : opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
344 24 : opts->retaindeadtuples = defGetBoolean(defel);
345 : }
346 94 : else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
347 70 : strcmp(defel->defname, "max_retention_duration") == 0)
348 : {
349 22 : if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
350 0 : errorConflictingDefElem(defel, pstate);
351 :
352 22 : opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
353 22 : opts->maxretention = defGetInt32(defel);
354 : }
355 72 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
356 48 : strcmp(defel->defname, "origin") == 0)
357 : {
358 42 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
359 0 : errorConflictingDefElem(defel, pstate);
360 :
361 42 : opts->specified_opts |= SUBOPT_ORIGIN;
362 42 : pfree(opts->origin);
363 :
364 : /*
365 : * Even though the "origin" parameter allows only "none" and "any"
366 : * values, it is implemented as a string type so that the
367 : * parameter can be extended in future versions to support
368 : * filtering using origin names specified by the user.
369 : */
370 42 : opts->origin = defGetString(defel);
371 :
372 58 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
373 16 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
374 6 : ereport(ERROR,
375 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
376 : errmsg("unrecognized origin value: \"%s\"", opts->origin));
377 : }
378 30 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
379 24 : strcmp(defel->defname, "lsn") == 0)
380 18 : {
381 24 : char *lsn_str = defGetString(defel);
382 : XLogRecPtr lsn;
383 :
384 24 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
385 0 : errorConflictingDefElem(defel, pstate);
386 :
387 : /* Setting lsn = NONE is treated as resetting LSN */
388 24 : if (strcmp(lsn_str, "none") == 0)
389 6 : lsn = InvalidXLogRecPtr;
390 : else
391 : {
392 : /* Parse the argument as LSN */
393 18 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
394 : CStringGetDatum(lsn_str)));
395 :
396 18 : if (XLogRecPtrIsInvalid(lsn))
397 6 : ereport(ERROR,
398 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
399 : errmsg("invalid WAL location (LSN): %s", lsn_str)));
400 : }
401 :
402 18 : opts->specified_opts |= SUBOPT_LSN;
403 18 : opts->lsn = lsn;
404 : }
405 : else
406 6 : ereport(ERROR,
407 : (errcode(ERRCODE_SYNTAX_ERROR),
408 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
409 : }
410 :
411 : /*
412 : * We've been explicitly asked to not connect, that requires some
413 : * additional processing.
414 : */
415 904 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
416 : {
417 : /* Check for incompatible options from the user. */
418 148 : if (opts->enabled &&
419 148 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
420 6 : ereport(ERROR,
421 : (errcode(ERRCODE_SYNTAX_ERROR),
422 : /*- translator: both %s are strings of the form "option = value" */
423 : errmsg("%s and %s are mutually exclusive options",
424 : "connect = false", "enabled = true")));
425 :
426 142 : if (opts->create_slot &&
427 136 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
428 6 : ereport(ERROR,
429 : (errcode(ERRCODE_SYNTAX_ERROR),
430 : errmsg("%s and %s are mutually exclusive options",
431 : "connect = false", "create_slot = true")));
432 :
433 136 : if (opts->copy_data &&
434 130 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
435 6 : ereport(ERROR,
436 : (errcode(ERRCODE_SYNTAX_ERROR),
437 : errmsg("%s and %s are mutually exclusive options",
438 : "connect = false", "copy_data = true")));
439 :
440 : /* Change the defaults of other options. */
441 130 : opts->enabled = false;
442 130 : opts->create_slot = false;
443 130 : opts->copy_data = false;
444 : }
445 :
446 : /*
447 : * Do additional checking for disallowed combination when slot_name = NONE
448 : * was used.
449 : */
450 886 : if (!opts->slot_name &&
451 858 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
452 : {
453 116 : if (opts->enabled)
454 : {
455 18 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
456 6 : ereport(ERROR,
457 : (errcode(ERRCODE_SYNTAX_ERROR),
458 : /*- translator: both %s are strings of the form "option = value" */
459 : errmsg("%s and %s are mutually exclusive options",
460 : "slot_name = NONE", "enabled = true")));
461 : else
462 12 : ereport(ERROR,
463 : (errcode(ERRCODE_SYNTAX_ERROR),
464 : /*- translator: both %s are strings of the form "option = value" */
465 : errmsg("subscription with %s must also set %s",
466 : "slot_name = NONE", "enabled = false")));
467 : }
468 :
469 98 : if (opts->create_slot)
470 : {
471 12 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
472 6 : ereport(ERROR,
473 : (errcode(ERRCODE_SYNTAX_ERROR),
474 : /*- translator: both %s are strings of the form "option = value" */
475 : errmsg("%s and %s are mutually exclusive options",
476 : "slot_name = NONE", "create_slot = true")));
477 : else
478 6 : ereport(ERROR,
479 : (errcode(ERRCODE_SYNTAX_ERROR),
480 : /*- translator: both %s are strings of the form "option = value" */
481 : errmsg("subscription with %s must also set %s",
482 : "slot_name = NONE", "create_slot = false")));
483 : }
484 : }
485 856 : }
486 :
487 : /*
488 : * Check that the specified publications are present on the publisher.
489 : */
490 : static void
491 256 : check_publications(WalReceiverConn *wrconn, List *publications)
492 : {
493 : WalRcvExecResult *res;
494 : StringInfo cmd;
495 : TupleTableSlot *slot;
496 256 : List *publicationsCopy = NIL;
497 256 : Oid tableRow[1] = {TEXTOID};
498 :
499 256 : cmd = makeStringInfo();
500 256 : appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
501 : " pg_catalog.pg_publication t WHERE\n"
502 : " t.pubname IN (");
503 256 : GetPublicationsStr(publications, cmd, true);
504 256 : appendStringInfoChar(cmd, ')');
505 :
506 256 : res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
507 256 : destroyStringInfo(cmd);
508 :
509 256 : if (res->status != WALRCV_OK_TUPLES)
510 0 : ereport(ERROR,
511 : errmsg("could not receive list of publications from the publisher: %s",
512 : res->err));
513 :
514 256 : publicationsCopy = list_copy(publications);
515 :
516 : /* Process publication(s). */
517 256 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
518 564 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
519 : {
520 : char *pubname;
521 : bool isnull;
522 :
523 308 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
524 : Assert(!isnull);
525 :
526 : /* Delete the publication present in publisher from the list. */
527 308 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
528 308 : ExecClearTuple(slot);
529 : }
530 :
531 256 : ExecDropSingleTupleTableSlot(slot);
532 :
533 256 : walrcv_clear_result(res);
534 :
535 256 : if (list_length(publicationsCopy))
536 : {
537 : /* Prepare the list of non-existent publication(s) for error message. */
538 8 : StringInfo pubnames = makeStringInfo();
539 :
540 8 : GetPublicationsStr(publicationsCopy, pubnames, false);
541 8 : ereport(WARNING,
542 : errcode(ERRCODE_UNDEFINED_OBJECT),
543 : errmsg_plural("publication %s does not exist on the publisher",
544 : "publications %s do not exist on the publisher",
545 : list_length(publicationsCopy),
546 : pubnames->data));
547 : }
548 256 : }
549 :
550 : /*
551 : * Auxiliary function to build a text array out of a list of String nodes.
552 : */
553 : static Datum
554 394 : publicationListToArray(List *publist)
555 : {
556 : ArrayType *arr;
557 : Datum *datums;
558 : MemoryContext memcxt;
559 : MemoryContext oldcxt;
560 :
561 : /* Create memory context for temporary allocations. */
562 394 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
563 : "publicationListToArray to array",
564 : ALLOCSET_DEFAULT_SIZES);
565 394 : oldcxt = MemoryContextSwitchTo(memcxt);
566 :
567 394 : datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
568 :
569 394 : check_duplicates_in_publist(publist, datums);
570 :
571 388 : MemoryContextSwitchTo(oldcxt);
572 :
573 388 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
574 :
575 388 : MemoryContextDelete(memcxt);
576 :
577 388 : return PointerGetDatum(arr);
578 : }
579 :
580 : /*
581 : * Create new subscription.
582 : */
583 : ObjectAddress
584 482 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
585 : bool isTopLevel)
586 : {
587 : Relation rel;
588 : ObjectAddress myself;
589 : Oid subid;
590 : bool nulls[Natts_pg_subscription];
591 : Datum values[Natts_pg_subscription];
592 482 : Oid owner = GetUserId();
593 : HeapTuple tup;
594 : char *conninfo;
595 : char originname[NAMEDATALEN];
596 : List *publications;
597 : bits32 supported_opts;
598 482 : SubOpts opts = {0};
599 : AclResult aclresult;
600 :
601 : /*
602 : * Parse and check options.
603 : *
604 : * Connection and publication should not be specified here.
605 : */
606 482 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
607 : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
608 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
609 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
610 : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
611 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
612 : SUBOPT_RETAIN_DEAD_TUPLES |
613 : SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
614 482 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
615 :
616 : /*
617 : * Since creating a replication slot is not transactional, rolling back
618 : * the transaction leaves the created replication slot. So we cannot run
619 : * CREATE SUBSCRIPTION inside a transaction block if creating a
620 : * replication slot.
621 : */
622 392 : if (opts.create_slot)
623 252 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
624 :
625 : /*
626 : * We don't want to allow unprivileged users to be able to trigger
627 : * attempts to access arbitrary network destinations, so require the user
628 : * to have been specifically authorized to create subscriptions.
629 : */
630 386 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
631 6 : ereport(ERROR,
632 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
633 : errmsg("permission denied to create subscription"),
634 : errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
635 : "pg_create_subscription")));
636 :
637 : /*
638 : * Since a subscription is a database object, we also check for CREATE
639 : * permission on the database.
640 : */
641 380 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
642 : owner, ACL_CREATE);
643 380 : if (aclresult != ACLCHECK_OK)
644 12 : aclcheck_error(aclresult, OBJECT_DATABASE,
645 6 : get_database_name(MyDatabaseId));
646 :
647 : /*
648 : * Non-superusers are required to set a password for authentication, and
649 : * that password must be used by the target server, but the superuser can
650 : * exempt a subscription from this requirement.
651 : */
652 374 : if (!opts.passwordrequired && !superuser_arg(owner))
653 6 : ereport(ERROR,
654 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
655 : errmsg("password_required=false is superuser-only"),
656 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
657 :
658 : /*
659 : * If built with appropriate switch, whine when regression-testing
660 : * conventions for subscription names are violated.
661 : */
662 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
663 : if (strncmp(stmt->subname, "regress_", 8) != 0)
664 : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
665 : #endif
666 :
667 368 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
668 :
669 : /* Check if name is used */
670 368 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
671 : ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
672 368 : if (OidIsValid(subid))
673 : {
674 6 : ereport(ERROR,
675 : (errcode(ERRCODE_DUPLICATE_OBJECT),
676 : errmsg("subscription \"%s\" already exists",
677 : stmt->subname)));
678 : }
679 :
680 : /*
681 : * Ensure that system configuration paramters are set appropriately to
682 : * support retain_dead_tuples and max_retention_duration.
683 : */
684 362 : CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
685 362 : opts.retaindeadtuples, opts.retaindeadtuples,
686 362 : (opts.maxretention > 0));
687 :
688 362 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
689 320 : opts.slot_name == NULL)
690 320 : opts.slot_name = stmt->subname;
691 :
692 : /* The default for synchronous_commit of subscriptions is off. */
693 362 : if (opts.synchronous_commit == NULL)
694 362 : opts.synchronous_commit = "off";
695 :
696 362 : conninfo = stmt->conninfo;
697 362 : publications = stmt->publication;
698 :
699 : /* Load the library providing us libpq calls. */
700 362 : load_file("libpqwalreceiver", false);
701 :
702 : /* Check the connection info string. */
703 362 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
704 :
705 : /* Everything ok, form a new tuple. */
706 344 : memset(values, 0, sizeof(values));
707 344 : memset(nulls, false, sizeof(nulls));
708 :
709 344 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
710 : Anum_pg_subscription_oid);
711 344 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
712 344 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
713 344 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
714 344 : values[Anum_pg_subscription_subname - 1] =
715 344 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
716 344 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
717 344 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
718 344 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
719 344 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
720 344 : values[Anum_pg_subscription_subtwophasestate - 1] =
721 344 : CharGetDatum(opts.twophase ?
722 : LOGICALREP_TWOPHASE_STATE_PENDING :
723 : LOGICALREP_TWOPHASE_STATE_DISABLED);
724 344 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
725 344 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
726 344 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
727 344 : values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
728 344 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
729 344 : BoolGetDatum(opts.retaindeadtuples);
730 344 : values[Anum_pg_subscription_submaxretention - 1] =
731 344 : Int32GetDatum(opts.maxretention);
732 344 : values[Anum_pg_subscription_subretentionactive - 1] =
733 344 : Int32GetDatum(opts.retaindeadtuples);
734 344 : values[Anum_pg_subscription_subconninfo - 1] =
735 344 : CStringGetTextDatum(conninfo);
736 344 : if (opts.slot_name)
737 324 : values[Anum_pg_subscription_subslotname - 1] =
738 324 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
739 : else
740 20 : nulls[Anum_pg_subscription_subslotname - 1] = true;
741 344 : values[Anum_pg_subscription_subsynccommit - 1] =
742 344 : CStringGetTextDatum(opts.synchronous_commit);
743 338 : values[Anum_pg_subscription_subpublications - 1] =
744 344 : publicationListToArray(publications);
745 338 : values[Anum_pg_subscription_suborigin - 1] =
746 338 : CStringGetTextDatum(opts.origin);
747 :
748 338 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
749 :
750 : /* Insert tuple into catalog. */
751 338 : CatalogTupleInsert(rel, tup);
752 338 : heap_freetuple(tup);
753 :
754 338 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
755 :
756 : /*
757 : * A replication origin is currently created for all subscriptions,
758 : * including those that only contain sequences or are otherwise empty.
759 : *
760 : * XXX: While this is technically unnecessary, optimizing it would require
761 : * additional logic to skip origin creation during DDL operations and
762 : * apply workers initialization, and to handle origin creation dynamically
763 : * when tables are added to the subscription. It is not clear whether
764 : * preventing creation of origins is worth additional complexity.
765 : */
766 338 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
767 338 : replorigin_create(originname);
768 :
769 : /*
770 : * Connect to remote side to execute requested commands and fetch table
771 : * and sequence info.
772 : */
773 338 : if (opts.connect)
774 : {
775 : char *err;
776 : WalReceiverConn *wrconn;
777 : bool must_use_password;
778 :
779 : /* Try to connect to the publisher. */
780 244 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
781 244 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
782 : stmt->subname, &err);
783 244 : if (!wrconn)
784 6 : ereport(ERROR,
785 : (errcode(ERRCODE_CONNECTION_FAILURE),
786 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
787 : stmt->subname, err)));
788 :
789 238 : PG_TRY();
790 : {
791 238 : bool has_tables = false;
792 : List *pubrels;
793 : char relation_state;
794 :
795 238 : check_publications(wrconn, publications);
796 238 : check_publications_origin_tables(wrconn, publications,
797 238 : opts.copy_data,
798 238 : opts.retaindeadtuples, opts.origin,
799 : NULL, 0, stmt->subname);
800 238 : check_publications_origin_sequences(wrconn, publications,
801 238 : opts.copy_data, opts.origin,
802 : NULL, 0, stmt->subname);
803 :
804 238 : if (opts.retaindeadtuples)
805 6 : check_pub_dead_tuple_retention(wrconn);
806 :
807 : /*
808 : * Set sync state based on if we were asked to do data copy or
809 : * not.
810 : */
811 238 : relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
812 :
813 : /*
814 : * Build local relation status info. Relations are for both tables
815 : * and sequences from the publisher.
816 : */
817 238 : pubrels = fetch_relation_list(wrconn, publications);
818 :
819 834 : foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
820 : {
821 : Oid relid;
822 : char relkind;
823 362 : RangeVar *rv = pubrelinfo->rv;
824 :
825 362 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
826 362 : relkind = get_rel_relkind(relid);
827 :
828 : /* Check for supported relkind. */
829 362 : CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
830 362 : rv->schemaname, rv->relname);
831 362 : has_tables |= (relkind != RELKIND_SEQUENCE);
832 362 : AddSubscriptionRelState(subid, relid, relation_state,
833 : InvalidXLogRecPtr, true);
834 : }
835 :
836 : /*
837 : * If requested, create permanent slot for the subscription. We
838 : * won't use the initial snapshot for anything, so no need to
839 : * export it.
840 : *
841 : * XXX: Similar to origins, it is not clear whether preventing the
842 : * slot creation for empty and sequence-only subscriptions is
843 : * worth additional complexity.
844 : */
845 236 : if (opts.create_slot)
846 : {
847 226 : bool twophase_enabled = false;
848 :
849 : Assert(opts.slot_name);
850 :
851 : /*
852 : * Even if two_phase is set, don't create the slot with
853 : * two-phase enabled. Will enable it once all the tables are
854 : * synced and ready. This avoids race-conditions like prepared
855 : * transactions being skipped due to changes not being applied
856 : * due to checks in should_apply_changes_for_rel() when
857 : * tablesync for the corresponding tables are in progress. See
858 : * comments atop worker.c.
859 : *
860 : * Note that if tables were specified but copy_data is false
861 : * then it is safe to enable two_phase up-front because those
862 : * tables are already initially in READY state. When the
863 : * subscription has no tables, we leave the twophase state as
864 : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
865 : * PUBLICATION to work.
866 : */
867 226 : if (opts.twophase && !opts.copy_data && has_tables)
868 2 : twophase_enabled = true;
869 :
870 226 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
871 : opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
872 :
873 226 : if (twophase_enabled)
874 2 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
875 :
876 226 : ereport(NOTICE,
877 : (errmsg("created replication slot \"%s\" on publisher",
878 : opts.slot_name)));
879 : }
880 : }
881 2 : PG_FINALLY();
882 : {
883 238 : walrcv_disconnect(wrconn);
884 : }
885 238 : PG_END_TRY();
886 : }
887 : else
888 94 : ereport(WARNING,
889 : (errmsg("subscription was created, but is not connected"),
890 : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
891 :
892 330 : table_close(rel, RowExclusiveLock);
893 :
894 330 : pgstat_create_subscription(subid);
895 :
896 : /*
897 : * Notify the launcher to start the apply worker if the subscription is
898 : * enabled, or to create the conflict detection slot if retain_dead_tuples
899 : * is enabled.
900 : *
901 : * Creating the conflict detection slot is essential even when the
902 : * subscription is not enabled. This ensures that dead tuples are
903 : * retained, which is necessary for accurately identifying the type of
904 : * conflict during replication.
905 : */
906 330 : if (opts.enabled || opts.retaindeadtuples)
907 226 : ApplyLauncherWakeupAtCommit();
908 :
909 330 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
910 :
911 330 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
912 :
913 330 : return myself;
914 : }
915 :
916 : static void
917 70 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
918 : List *validate_publications)
919 : {
920 : char *err;
921 70 : List *pubrels = NIL;
922 : Oid *pubrel_local_oids;
923 : List *subrel_states;
924 70 : List *sub_remove_rels = NIL;
925 : Oid *subrel_local_oids;
926 : Oid *subseq_local_oids;
927 : int subrel_count;
928 : ListCell *lc;
929 : int off;
930 70 : int tbl_count = 0;
931 70 : int seq_count = 0;
932 70 : Relation rel = NULL;
933 : typedef struct SubRemoveRels
934 : {
935 : Oid relid;
936 : char state;
937 : } SubRemoveRels;
938 :
939 : WalReceiverConn *wrconn;
940 : bool must_use_password;
941 :
942 : /* Load the library providing us libpq calls. */
943 70 : load_file("libpqwalreceiver", false);
944 :
945 : /* Try to connect to the publisher. */
946 70 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
947 70 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
948 : sub->name, &err);
949 68 : if (!wrconn)
950 0 : ereport(ERROR,
951 : (errcode(ERRCODE_CONNECTION_FAILURE),
952 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
953 : sub->name, err)));
954 :
955 68 : PG_TRY();
956 : {
957 68 : if (validate_publications)
958 18 : check_publications(wrconn, validate_publications);
959 :
960 : /* Get the relation list from publisher. */
961 68 : pubrels = fetch_relation_list(wrconn, sub->publications);
962 :
963 : /* Get local relation list. */
964 68 : subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
965 68 : subrel_count = list_length(subrel_states);
966 :
967 : /*
968 : * Build qsorted arrays of local table oids and sequence oids for
969 : * faster lookup. This can potentially contain all tables and
970 : * sequences in the database so speed of lookup is important.
971 : *
972 : * We do not yet know the exact count of tables and sequences, so we
973 : * allocate separate arrays for table OIDs and sequence OIDs based on
974 : * the total number of relations (subrel_count).
975 : */
976 68 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
977 68 : subseq_local_oids = palloc(subrel_count * sizeof(Oid));
978 244 : foreach(lc, subrel_states)
979 : {
980 176 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
981 :
982 176 : if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
983 0 : subseq_local_oids[seq_count++] = relstate->relid;
984 : else
985 176 : subrel_local_oids[tbl_count++] = relstate->relid;
986 : }
987 :
988 68 : qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
989 68 : check_publications_origin_tables(wrconn, sub->publications, copy_data,
990 68 : sub->retaindeadtuples, sub->origin,
991 : subrel_local_oids, tbl_count,
992 : sub->name);
993 :
994 68 : qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
995 68 : check_publications_origin_sequences(wrconn, sub->publications,
996 : copy_data, sub->origin,
997 : subseq_local_oids, seq_count,
998 : sub->name);
999 :
1000 : /*
1001 : * Walk over the remote relations and try to match them to locally
1002 : * known relations. If the relation is not known locally create a new
1003 : * state for it.
1004 : *
1005 : * Also builds array of local oids of remote relations for the next
1006 : * step.
1007 : */
1008 68 : off = 0;
1009 68 : pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
1010 :
1011 316 : foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
1012 : {
1013 180 : RangeVar *rv = pubrelinfo->rv;
1014 : Oid relid;
1015 : char relkind;
1016 :
1017 180 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
1018 180 : relkind = get_rel_relkind(relid);
1019 :
1020 : /* Check for supported relkind. */
1021 180 : CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1022 180 : rv->schemaname, rv->relname);
1023 :
1024 180 : pubrel_local_oids[off++] = relid;
1025 :
1026 180 : if (!bsearch(&relid, subrel_local_oids,
1027 46 : tbl_count, sizeof(Oid), oid_cmp) &&
1028 46 : !bsearch(&relid, subseq_local_oids,
1029 : seq_count, sizeof(Oid), oid_cmp))
1030 : {
1031 46 : AddSubscriptionRelState(sub->oid, relid,
1032 : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
1033 : InvalidXLogRecPtr, true);
1034 46 : ereport(DEBUG1,
1035 : errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
1036 : relkind == RELKIND_SEQUENCE ? "sequence" : "table",
1037 : rv->schemaname, rv->relname, sub->name));
1038 : }
1039 : }
1040 :
1041 : /*
1042 : * Next remove state for tables we should not care about anymore using
1043 : * the data we collected above
1044 : */
1045 68 : qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
1046 :
1047 244 : for (off = 0; off < tbl_count; off++)
1048 : {
1049 176 : Oid relid = subrel_local_oids[off];
1050 :
1051 176 : if (!bsearch(&relid, pubrel_local_oids,
1052 176 : list_length(pubrels), sizeof(Oid), oid_cmp))
1053 : {
1054 : char state;
1055 : XLogRecPtr statelsn;
1056 42 : SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels));
1057 :
1058 : /*
1059 : * Lock pg_subscription_rel with AccessExclusiveLock to
1060 : * prevent any race conditions with the apply worker
1061 : * re-launching workers at the same time this code is trying
1062 : * to remove those tables.
1063 : *
1064 : * Even if new worker for this particular rel is restarted it
1065 : * won't be able to make any progress as we hold exclusive
1066 : * lock on pg_subscription_rel till the transaction end. It
1067 : * will simply exit as there is no corresponding rel entry.
1068 : *
1069 : * This locking also ensures that the state of rels won't
1070 : * change till we are done with this refresh operation.
1071 : */
1072 42 : if (!rel)
1073 18 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1074 :
1075 : /* Last known rel state. */
1076 42 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1077 :
1078 42 : RemoveSubscriptionRel(sub->oid, relid);
1079 :
1080 42 : remove_rel->relid = relid;
1081 42 : remove_rel->state = state;
1082 :
1083 42 : sub_remove_rels = lappend(sub_remove_rels, remove_rel);
1084 :
1085 42 : logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
1086 :
1087 : /*
1088 : * For READY state, we would have already dropped the
1089 : * tablesync origin.
1090 : */
1091 42 : if (state != SUBREL_STATE_READY)
1092 : {
1093 : char originname[NAMEDATALEN];
1094 :
1095 : /*
1096 : * Drop the tablesync's origin tracking if exists.
1097 : *
1098 : * It is possible that the origin is not yet created for
1099 : * tablesync worker, this can happen for the states before
1100 : * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
1101 : * apply worker can also concurrently try to drop the
1102 : * origin and by this time the origin might be already
1103 : * removed. For these reasons, passing missing_ok = true.
1104 : */
1105 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1106 : sizeof(originname));
1107 0 : replorigin_drop_by_name(originname, true, false);
1108 : }
1109 :
1110 42 : ereport(DEBUG1,
1111 : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1112 : get_namespace_name(get_rel_namespace(relid)),
1113 : get_rel_name(relid),
1114 : sub->name)));
1115 : }
1116 : }
1117 :
1118 : /*
1119 : * Drop the tablesync slots associated with removed tables. This has
1120 : * to be at the end because otherwise if there is an error while doing
1121 : * the database operations we won't be able to rollback dropped slots.
1122 : */
1123 178 : foreach_ptr(SubRemoveRels, rel, sub_remove_rels)
1124 : {
1125 42 : if (rel->state != SUBREL_STATE_READY &&
1126 0 : rel->state != SUBREL_STATE_SYNCDONE)
1127 : {
1128 0 : char syncslotname[NAMEDATALEN] = {0};
1129 :
1130 : /*
1131 : * For READY/SYNCDONE states we know the tablesync slot has
1132 : * already been dropped by the tablesync worker.
1133 : *
1134 : * For other states, there is no certainty, maybe the slot
1135 : * does not exist yet. Also, if we fail after removing some of
1136 : * the slots, next time, it will again try to drop already
1137 : * dropped slots and fail. For these reasons, we allow
1138 : * missing_ok = true for the drop.
1139 : */
1140 0 : ReplicationSlotNameForTablesync(sub->oid, rel->relid,
1141 : syncslotname, sizeof(syncslotname));
1142 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1143 : }
1144 : }
1145 :
1146 : /*
1147 : * Next remove state for sequences we should not care about anymore
1148 : * using the data we collected above
1149 : */
1150 68 : for (off = 0; off < seq_count; off++)
1151 : {
1152 0 : Oid relid = subseq_local_oids[off];
1153 :
1154 0 : if (!bsearch(&relid, pubrel_local_oids,
1155 0 : list_length(pubrels), sizeof(Oid), oid_cmp))
1156 : {
1157 : /*
1158 : * This locking ensures that the state of rels won't change
1159 : * till we are done with this refresh operation.
1160 : */
1161 0 : if (!rel)
1162 0 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1163 :
1164 0 : RemoveSubscriptionRel(sub->oid, relid);
1165 :
1166 0 : ereport(DEBUG1,
1167 : errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
1168 : get_namespace_name(get_rel_namespace(relid)),
1169 : get_rel_name(relid),
1170 : sub->name));
1171 : }
1172 : }
1173 : }
1174 0 : PG_FINALLY();
1175 : {
1176 68 : walrcv_disconnect(wrconn);
1177 : }
1178 68 : PG_END_TRY();
1179 :
1180 68 : if (rel)
1181 18 : table_close(rel, NoLock);
1182 68 : }
1183 :
1184 : /*
1185 : * Marks all sequences with INIT state.
1186 : */
1187 : static void
1188 0 : AlterSubscription_refresh_seq(Subscription *sub)
1189 : {
1190 0 : char *err = NULL;
1191 : WalReceiverConn *wrconn;
1192 : bool must_use_password;
1193 :
1194 : /* Load the library providing us libpq calls. */
1195 0 : load_file("libpqwalreceiver", false);
1196 :
1197 : /* Try to connect to the publisher. */
1198 0 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1199 0 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1200 : sub->name, &err);
1201 0 : if (!wrconn)
1202 0 : ereport(ERROR,
1203 : errcode(ERRCODE_CONNECTION_FAILURE),
1204 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1205 : sub->name, err));
1206 :
1207 0 : PG_TRY();
1208 : {
1209 : List *subrel_states;
1210 :
1211 0 : check_publications_origin_sequences(wrconn, sub->publications, true,
1212 : sub->origin, NULL, 0, sub->name);
1213 :
1214 : /* Get local sequence list. */
1215 0 : subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
1216 0 : foreach_ptr(SubscriptionRelState, subrel, subrel_states)
1217 : {
1218 0 : Oid relid = subrel->relid;
1219 :
1220 0 : UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
1221 : InvalidXLogRecPtr, false);
1222 0 : ereport(DEBUG1,
1223 : errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1224 : get_namespace_name(get_rel_namespace(relid)),
1225 : get_rel_name(relid),
1226 : sub->name));
1227 : }
1228 : }
1229 0 : PG_FINALLY();
1230 : {
1231 0 : walrcv_disconnect(wrconn);
1232 : }
1233 0 : PG_END_TRY();
1234 0 : }
1235 :
1236 : /*
1237 : * Common checks for altering failover, two_phase, and retain_dead_tuples
1238 : * options.
1239 : */
1240 : static void
1241 26 : CheckAlterSubOption(Subscription *sub, const char *option,
1242 : bool slot_needs_update, bool isTopLevel)
1243 : {
1244 : Assert(strcmp(option, "failover") == 0 ||
1245 : strcmp(option, "two_phase") == 0 ||
1246 : strcmp(option, "retain_dead_tuples") == 0);
1247 :
1248 : /*
1249 : * Altering the retain_dead_tuples option does not update the slot on the
1250 : * publisher.
1251 : */
1252 : Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1253 :
1254 : /*
1255 : * Do not allow changing the option if the subscription is enabled. This
1256 : * is because both failover and two_phase options of the slot on the
1257 : * publisher cannot be modified if the slot is currently acquired by the
1258 : * existing walsender.
1259 : *
1260 : * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1261 : * the publisher by the existing walsender, so we could have allowed that
1262 : * even when the subscription is enabled. But we kept this restriction for
1263 : * the sake of consistency and simplicity.
1264 : *
1265 : * Additionally, do not allow changing the retain_dead_tuples option when
1266 : * the subscription is enabled to prevent race conditions arising from the
1267 : * new option value being acknowledged asynchronously by the launcher and
1268 : * apply workers.
1269 : *
1270 : * Without the restriction, a race condition may arise when a user
1271 : * disables and immediately re-enables the retain_dead_tuples option. In
1272 : * this case, the launcher might drop the slot upon noticing the disabled
1273 : * action, while the apply worker may keep maintaining
1274 : * oldest_nonremovable_xid without noticing the option change. During this
1275 : * period, a transaction ID wraparound could falsely make this ID appear
1276 : * as if it originates from the future w.r.t the transaction ID stored in
1277 : * the slot maintained by launcher.
1278 : *
1279 : * Similarly, if the user enables retain_dead_tuples concurrently with the
1280 : * launcher starting the worker, the apply worker may start calculating
1281 : * oldest_nonremovable_xid before the launcher notices the enable action.
1282 : * Consequently, the launcher may update slot.xmin to a newer value than
1283 : * that maintained by the worker. In subsequent cycles, upon integrating
1284 : * the worker's oldest_nonremovable_xid, the launcher might detect a
1285 : * retreat in the calculated xmin, necessitating additional handling.
1286 : *
1287 : * XXX To address the above race conditions, we can define
1288 : * oldest_nonremovable_xid as FullTransactionID and adds the check to
1289 : * disallow retreating the conflict slot's xmin. For now, we kept the
1290 : * implementation simple by disallowing change to the retain_dead_tuples,
1291 : * but in the future we can change this after some more analysis.
1292 : *
1293 : * Note that we could restrict only the enabling of retain_dead_tuples to
1294 : * avoid the race conditions described above, but we maintain the
1295 : * restriction for both enable and disable operations for the sake of
1296 : * consistency.
1297 : */
1298 26 : if (sub->enabled)
1299 4 : ereport(ERROR,
1300 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1301 : errmsg("cannot set option \"%s\" for enabled subscription",
1302 : option)));
1303 :
1304 22 : if (slot_needs_update)
1305 : {
1306 : StringInfoData cmd;
1307 :
1308 : /*
1309 : * A valid slot must be associated with the subscription for us to
1310 : * modify any of the slot's properties.
1311 : */
1312 16 : if (!sub->slotname)
1313 0 : ereport(ERROR,
1314 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1315 : errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1316 : option)));
1317 :
1318 : /* The changed option of the slot can't be rolled back. */
1319 16 : initStringInfo(&cmd);
1320 16 : appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1321 :
1322 16 : PreventInTransactionBlock(isTopLevel, cmd.data);
1323 10 : pfree(cmd.data);
1324 : }
1325 16 : }
1326 :
1327 : /*
1328 : * Alter the existing subscription.
1329 : */
1330 : ObjectAddress
1331 520 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1332 : bool isTopLevel)
1333 : {
1334 : Relation rel;
1335 : ObjectAddress myself;
1336 : bool nulls[Natts_pg_subscription];
1337 : bool replaces[Natts_pg_subscription];
1338 : Datum values[Natts_pg_subscription];
1339 : HeapTuple tup;
1340 : Oid subid;
1341 520 : bool update_tuple = false;
1342 520 : bool update_failover = false;
1343 520 : bool update_two_phase = false;
1344 520 : bool check_pub_rdt = false;
1345 : bool retain_dead_tuples;
1346 : int max_retention;
1347 : bool retention_active;
1348 : char *origin;
1349 : Subscription *sub;
1350 : Form_pg_subscription form;
1351 : bits32 supported_opts;
1352 520 : SubOpts opts = {0};
1353 :
1354 520 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1355 :
1356 : /* Fetch the existing tuple. */
1357 520 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
1358 : CStringGetDatum(stmt->subname));
1359 :
1360 520 : if (!HeapTupleIsValid(tup))
1361 6 : ereport(ERROR,
1362 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1363 : errmsg("subscription \"%s\" does not exist",
1364 : stmt->subname)));
1365 :
1366 514 : form = (Form_pg_subscription) GETSTRUCT(tup);
1367 514 : subid = form->oid;
1368 :
1369 : /* must be owner */
1370 514 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1371 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1372 0 : stmt->subname);
1373 :
1374 514 : sub = GetSubscription(subid, false);
1375 :
1376 514 : retain_dead_tuples = sub->retaindeadtuples;
1377 514 : origin = sub->origin;
1378 514 : max_retention = sub->maxretention;
1379 514 : retention_active = sub->retentionactive;
1380 :
1381 : /*
1382 : * Don't allow non-superuser modification of a subscription with
1383 : * password_required=false.
1384 : */
1385 514 : if (!sub->passwordrequired && !superuser())
1386 0 : ereport(ERROR,
1387 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1388 : errmsg("password_required=false is superuser-only"),
1389 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1390 :
1391 : /* Lock the subscription so nobody else can do anything with it. */
1392 514 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1393 :
1394 : /* Form a new tuple. */
1395 514 : memset(values, 0, sizeof(values));
1396 514 : memset(nulls, false, sizeof(nulls));
1397 514 : memset(replaces, false, sizeof(replaces));
1398 :
1399 514 : switch (stmt->kind)
1400 : {
1401 218 : case ALTER_SUBSCRIPTION_OPTIONS:
1402 : {
1403 218 : supported_opts = (SUBOPT_SLOT_NAME |
1404 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1405 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
1406 : SUBOPT_DISABLE_ON_ERR |
1407 : SUBOPT_PASSWORD_REQUIRED |
1408 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1409 : SUBOPT_RETAIN_DEAD_TUPLES |
1410 : SUBOPT_MAX_RETENTION_DURATION |
1411 : SUBOPT_ORIGIN);
1412 :
1413 218 : parse_subscription_options(pstate, stmt->options,
1414 : supported_opts, &opts);
1415 :
1416 200 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1417 : {
1418 : /*
1419 : * The subscription must be disabled to allow slot_name as
1420 : * 'none', otherwise, the apply worker will repeatedly try
1421 : * to stream the data using that slot_name which neither
1422 : * exists on the publisher nor the user will be allowed to
1423 : * create it.
1424 : */
1425 72 : if (sub->enabled && !opts.slot_name)
1426 0 : ereport(ERROR,
1427 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1428 : errmsg("cannot set %s for enabled subscription",
1429 : "slot_name = NONE")));
1430 :
1431 72 : if (opts.slot_name)
1432 6 : values[Anum_pg_subscription_subslotname - 1] =
1433 6 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1434 : else
1435 66 : nulls[Anum_pg_subscription_subslotname - 1] = true;
1436 72 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1437 : }
1438 :
1439 200 : if (opts.synchronous_commit)
1440 : {
1441 6 : values[Anum_pg_subscription_subsynccommit - 1] =
1442 6 : CStringGetTextDatum(opts.synchronous_commit);
1443 6 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1444 : }
1445 :
1446 200 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1447 : {
1448 18 : values[Anum_pg_subscription_subbinary - 1] =
1449 18 : BoolGetDatum(opts.binary);
1450 18 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1451 : }
1452 :
1453 200 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1454 : {
1455 30 : values[Anum_pg_subscription_substream - 1] =
1456 30 : CharGetDatum(opts.streaming);
1457 30 : replaces[Anum_pg_subscription_substream - 1] = true;
1458 : }
1459 :
1460 200 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1461 : {
1462 : values[Anum_pg_subscription_subdisableonerr - 1]
1463 6 : = BoolGetDatum(opts.disableonerr);
1464 : replaces[Anum_pg_subscription_subdisableonerr - 1]
1465 6 : = true;
1466 : }
1467 :
1468 200 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1469 : {
1470 : /* Non-superuser may not disable password_required. */
1471 12 : if (!opts.passwordrequired && !superuser())
1472 0 : ereport(ERROR,
1473 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1474 : errmsg("password_required=false is superuser-only"),
1475 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1476 :
1477 : values[Anum_pg_subscription_subpasswordrequired - 1]
1478 12 : = BoolGetDatum(opts.passwordrequired);
1479 : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1480 12 : = true;
1481 : }
1482 :
1483 200 : if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1484 : {
1485 14 : values[Anum_pg_subscription_subrunasowner - 1] =
1486 14 : BoolGetDatum(opts.runasowner);
1487 14 : replaces[Anum_pg_subscription_subrunasowner - 1] = true;
1488 : }
1489 :
1490 200 : if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1491 : {
1492 : /*
1493 : * We need to update both the slot and the subscription
1494 : * for the two_phase option. We can enable the two_phase
1495 : * option for a slot only once the initial data
1496 : * synchronization is done. This is to avoid missing some
1497 : * data as explained in comments atop worker.c.
1498 : */
1499 6 : update_two_phase = !opts.twophase;
1500 :
1501 6 : CheckAlterSubOption(sub, "two_phase", update_two_phase,
1502 : isTopLevel);
1503 :
1504 : /*
1505 : * Modifying the two_phase slot option requires a slot
1506 : * lookup by slot name, so changing the slot name at the
1507 : * same time is not allowed.
1508 : */
1509 6 : if (update_two_phase &&
1510 2 : IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1511 0 : ereport(ERROR,
1512 : (errcode(ERRCODE_SYNTAX_ERROR),
1513 : errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1514 :
1515 : /*
1516 : * Note that workers may still survive even if the
1517 : * subscription has been disabled.
1518 : *
1519 : * Ensure workers have already been exited to avoid
1520 : * getting prepared transactions while we are disabling
1521 : * the two_phase option. Otherwise, the changes of an
1522 : * already prepared transaction can be replicated again
1523 : * along with its corresponding commit, leading to
1524 : * duplicate data or errors.
1525 : */
1526 6 : if (logicalrep_workers_find(subid, true, true))
1527 0 : ereport(ERROR,
1528 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1529 : errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1530 : errhint("Try again after some time.")));
1531 :
1532 : /*
1533 : * two_phase cannot be disabled if there are any
1534 : * uncommitted prepared transactions present otherwise it
1535 : * can lead to duplicate data or errors as explained in
1536 : * the comment above.
1537 : */
1538 6 : if (update_two_phase &&
1539 2 : sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
1540 2 : LookupGXactBySubid(subid))
1541 0 : ereport(ERROR,
1542 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1543 : errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1544 : errhint("Resolve these transactions and try again.")));
1545 :
1546 : /* Change system catalog accordingly */
1547 6 : values[Anum_pg_subscription_subtwophasestate - 1] =
1548 6 : CharGetDatum(opts.twophase ?
1549 : LOGICALREP_TWOPHASE_STATE_PENDING :
1550 : LOGICALREP_TWOPHASE_STATE_DISABLED);
1551 6 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1552 : }
1553 :
1554 200 : if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1555 : {
1556 : /*
1557 : * Similar to the two_phase case above, we need to update
1558 : * the failover option for both the slot and the
1559 : * subscription.
1560 : */
1561 16 : update_failover = true;
1562 :
1563 16 : CheckAlterSubOption(sub, "failover", update_failover,
1564 : isTopLevel);
1565 :
1566 8 : values[Anum_pg_subscription_subfailover - 1] =
1567 8 : BoolGetDatum(opts.failover);
1568 8 : replaces[Anum_pg_subscription_subfailover - 1] = true;
1569 : }
1570 :
1571 192 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1572 : {
1573 4 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
1574 4 : BoolGetDatum(opts.retaindeadtuples);
1575 4 : replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
1576 :
1577 : /*
1578 : * Update the retention status only if there's a change in
1579 : * the retain_dead_tuples option value.
1580 : *
1581 : * Automatically marking retention as active when
1582 : * retain_dead_tuples is enabled may not always be ideal,
1583 : * especially if retention was previously stopped and the
1584 : * user toggles retain_dead_tuples without adjusting the
1585 : * publisher workload. However, this behavior provides a
1586 : * convenient way for users to manually refresh the
1587 : * retention status. Since retention will be stopped again
1588 : * unless the publisher workload is reduced, this approach
1589 : * is acceptable for now.
1590 : */
1591 4 : if (opts.retaindeadtuples != sub->retaindeadtuples)
1592 : {
1593 4 : values[Anum_pg_subscription_subretentionactive - 1] =
1594 4 : BoolGetDatum(opts.retaindeadtuples);
1595 4 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
1596 :
1597 4 : retention_active = opts.retaindeadtuples;
1598 : }
1599 :
1600 4 : CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1601 :
1602 : /*
1603 : * Workers may continue running even after the
1604 : * subscription has been disabled.
1605 : *
1606 : * To prevent race conditions (as described in
1607 : * CheckAlterSubOption()), ensure that all worker
1608 : * processes have already exited before proceeding.
1609 : */
1610 2 : if (logicalrep_workers_find(subid, true, true))
1611 0 : ereport(ERROR,
1612 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1613 : errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1614 : errhint("Try again after some time.")));
1615 :
1616 : /*
1617 : * Notify the launcher to manage the replication slot for
1618 : * conflict detection. This ensures that replication slot
1619 : * is efficiently handled (created, updated, or dropped)
1620 : * in response to any configuration changes.
1621 : */
1622 2 : ApplyLauncherWakeupAtCommit();
1623 :
1624 2 : check_pub_rdt = opts.retaindeadtuples;
1625 2 : retain_dead_tuples = opts.retaindeadtuples;
1626 : }
1627 :
1628 190 : if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1629 : {
1630 10 : values[Anum_pg_subscription_submaxretention - 1] =
1631 10 : Int32GetDatum(opts.maxretention);
1632 10 : replaces[Anum_pg_subscription_submaxretention - 1] = true;
1633 :
1634 10 : max_retention = opts.maxretention;
1635 : }
1636 :
1637 : /*
1638 : * Ensure that system configuration paramters are set
1639 : * appropriately to support retain_dead_tuples and
1640 : * max_retention_duration.
1641 : */
1642 190 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1643 188 : IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1644 12 : CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
1645 : retain_dead_tuples,
1646 : retention_active,
1647 12 : (max_retention > 0));
1648 :
1649 190 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1650 : {
1651 10 : values[Anum_pg_subscription_suborigin - 1] =
1652 10 : CStringGetTextDatum(opts.origin);
1653 10 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1654 :
1655 : /*
1656 : * Check if changes from different origins may be received
1657 : * from the publisher when the origin is changed to ANY
1658 : * and retain_dead_tuples is enabled.
1659 : */
1660 14 : check_pub_rdt = retain_dead_tuples &&
1661 4 : pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
1662 :
1663 10 : origin = opts.origin;
1664 : }
1665 :
1666 190 : update_tuple = true;
1667 190 : break;
1668 : }
1669 :
1670 104 : case ALTER_SUBSCRIPTION_ENABLED:
1671 : {
1672 104 : parse_subscription_options(pstate, stmt->options,
1673 : SUBOPT_ENABLED, &opts);
1674 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1675 :
1676 104 : if (!sub->slotname && opts.enabled)
1677 6 : ereport(ERROR,
1678 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1679 : errmsg("cannot enable subscription that does not have a slot name")));
1680 :
1681 : /*
1682 : * Check track_commit_timestamp only when enabling the
1683 : * subscription in case it was disabled after creation. See
1684 : * comments atop CheckSubDeadTupleRetention() for details.
1685 : */
1686 98 : CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1687 98 : WARNING, sub->retaindeadtuples,
1688 98 : sub->retentionactive, false);
1689 :
1690 98 : values[Anum_pg_subscription_subenabled - 1] =
1691 98 : BoolGetDatum(opts.enabled);
1692 98 : replaces[Anum_pg_subscription_subenabled - 1] = true;
1693 :
1694 98 : if (opts.enabled)
1695 54 : ApplyLauncherWakeupAtCommit();
1696 :
1697 98 : update_tuple = true;
1698 :
1699 : /*
1700 : * The subscription might be initially created with
1701 : * connect=false and retain_dead_tuples=true, meaning the
1702 : * remote server's status may not be checked. Ensure this
1703 : * check is conducted now.
1704 : */
1705 98 : check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1706 98 : break;
1707 : }
1708 :
1709 20 : case ALTER_SUBSCRIPTION_CONNECTION:
1710 : /* Load the library providing us libpq calls. */
1711 20 : load_file("libpqwalreceiver", false);
1712 : /* Check the connection info string. */
1713 20 : walrcv_check_conninfo(stmt->conninfo,
1714 : sub->passwordrequired && !sub->ownersuperuser);
1715 :
1716 14 : values[Anum_pg_subscription_subconninfo - 1] =
1717 14 : CStringGetTextDatum(stmt->conninfo);
1718 14 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1719 14 : update_tuple = true;
1720 :
1721 : /*
1722 : * Since the remote server configuration might have changed,
1723 : * perform a check to ensure it permits enabling
1724 : * retain_dead_tuples.
1725 : */
1726 14 : check_pub_rdt = sub->retaindeadtuples;
1727 14 : break;
1728 :
1729 32 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
1730 : {
1731 32 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1732 32 : parse_subscription_options(pstate, stmt->options,
1733 : supported_opts, &opts);
1734 :
1735 32 : values[Anum_pg_subscription_subpublications - 1] =
1736 32 : publicationListToArray(stmt->publication);
1737 32 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1738 :
1739 32 : update_tuple = true;
1740 :
1741 : /* Refresh if user asked us to. */
1742 32 : if (opts.refresh)
1743 : {
1744 26 : if (!sub->enabled)
1745 0 : ereport(ERROR,
1746 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1747 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1748 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1749 :
1750 : /*
1751 : * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1752 : * why this is not allowed.
1753 : */
1754 26 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1755 0 : ereport(ERROR,
1756 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1757 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1758 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1759 :
1760 26 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1761 :
1762 : /* Make sure refresh sees the new list of publications. */
1763 14 : sub->publications = stmt->publication;
1764 :
1765 14 : AlterSubscription_refresh(sub, opts.copy_data,
1766 : stmt->publication);
1767 : }
1768 :
1769 20 : break;
1770 : }
1771 :
1772 54 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
1773 : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1774 : {
1775 : List *publist;
1776 54 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1777 :
1778 54 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1779 54 : parse_subscription_options(pstate, stmt->options,
1780 : supported_opts, &opts);
1781 :
1782 54 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1783 18 : values[Anum_pg_subscription_subpublications - 1] =
1784 18 : publicationListToArray(publist);
1785 18 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1786 :
1787 18 : update_tuple = true;
1788 :
1789 : /* Refresh if user asked us to. */
1790 18 : if (opts.refresh)
1791 : {
1792 : /* We only need to validate user specified publications. */
1793 6 : List *validate_publications = (isadd) ? stmt->publication : NULL;
1794 :
1795 6 : if (!sub->enabled)
1796 0 : ereport(ERROR,
1797 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1798 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1799 : /* translator: %s is an SQL ALTER command */
1800 : errhint("Use %s instead.",
1801 : isadd ?
1802 : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1803 : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1804 :
1805 : /*
1806 : * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1807 : * why this is not allowed.
1808 : */
1809 6 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1810 0 : ereport(ERROR,
1811 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1812 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1813 : /* translator: %s is an SQL ALTER command */
1814 : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1815 : isadd ?
1816 : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1817 : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1818 :
1819 6 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1820 :
1821 : /* Refresh the new list of publications. */
1822 6 : sub->publications = publist;
1823 :
1824 6 : AlterSubscription_refresh(sub, opts.copy_data,
1825 : validate_publications);
1826 : }
1827 :
1828 18 : break;
1829 : }
1830 :
1831 62 : case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
1832 : {
1833 62 : if (!sub->enabled)
1834 6 : ereport(ERROR,
1835 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1836 : errmsg("%s is not allowed for disabled subscriptions",
1837 : "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
1838 :
1839 56 : parse_subscription_options(pstate, stmt->options,
1840 : SUBOPT_COPY_DATA, &opts);
1841 :
1842 : /*
1843 : * The subscription option "two_phase" requires that
1844 : * replication has passed the initial table synchronization
1845 : * phase before the two_phase becomes properly enabled.
1846 : *
1847 : * But, having reached this two-phase commit "enabled" state
1848 : * we must not allow any subsequent table initialization to
1849 : * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
1850 : * disallowed when the user had requested two_phase = on mode.
1851 : *
1852 : * The exception to this restriction is when copy_data =
1853 : * false, because when copy_data is false the tablesync will
1854 : * start already in READY state and will exit directly without
1855 : * doing anything.
1856 : *
1857 : * For more details see comments atop worker.c.
1858 : */
1859 56 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1860 0 : ereport(ERROR,
1861 : (errcode(ERRCODE_SYNTAX_ERROR),
1862 : errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
1863 : errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1864 :
1865 56 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
1866 :
1867 50 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
1868 :
1869 48 : break;
1870 : }
1871 :
1872 0 : case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
1873 : {
1874 0 : if (!sub->enabled)
1875 0 : ereport(ERROR,
1876 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1877 : errmsg("%s is not allowed for disabled subscriptions",
1878 : "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
1879 :
1880 0 : AlterSubscription_refresh_seq(sub);
1881 :
1882 0 : break;
1883 : }
1884 :
1885 24 : case ALTER_SUBSCRIPTION_SKIP:
1886 : {
1887 24 : parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1888 :
1889 : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1890 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1891 :
1892 : /*
1893 : * If the user sets subskiplsn, we do a sanity check to make
1894 : * sure that the specified LSN is a probable value.
1895 : */
1896 18 : if (!XLogRecPtrIsInvalid(opts.lsn))
1897 : {
1898 : RepOriginId originid;
1899 : char originname[NAMEDATALEN];
1900 : XLogRecPtr remote_lsn;
1901 :
1902 12 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
1903 : originname, sizeof(originname));
1904 12 : originid = replorigin_by_name(originname, false);
1905 12 : remote_lsn = replorigin_get_progress(originid, false);
1906 :
1907 : /* Check the given LSN is at least a future LSN */
1908 12 : if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1909 0 : ereport(ERROR,
1910 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1911 : errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
1912 : LSN_FORMAT_ARGS(opts.lsn),
1913 : LSN_FORMAT_ARGS(remote_lsn))));
1914 : }
1915 :
1916 18 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1917 18 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1918 :
1919 18 : update_tuple = true;
1920 18 : break;
1921 : }
1922 :
1923 0 : default:
1924 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1925 : stmt->kind);
1926 : }
1927 :
1928 : /* Update the catalog if needed. */
1929 406 : if (update_tuple)
1930 : {
1931 358 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1932 : replaces);
1933 :
1934 358 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1935 :
1936 358 : heap_freetuple(tup);
1937 : }
1938 :
1939 : /*
1940 : * Try to acquire the connection necessary either for modifying the slot
1941 : * or for checking if the remote server permits enabling
1942 : * retain_dead_tuples.
1943 : *
1944 : * This has to be at the end because otherwise if there is an error while
1945 : * doing the database operations we won't be able to rollback altered
1946 : * slot.
1947 : */
1948 406 : if (update_failover || update_two_phase || check_pub_rdt)
1949 : {
1950 : bool must_use_password;
1951 : char *err;
1952 : WalReceiverConn *wrconn;
1953 :
1954 : /* Load the library providing us libpq calls. */
1955 26 : load_file("libpqwalreceiver", false);
1956 :
1957 : /*
1958 : * Try to connect to the publisher, using the new connection string if
1959 : * available.
1960 : */
1961 26 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1962 26 : wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
1963 : true, true, must_use_password, sub->name,
1964 : &err);
1965 26 : if (!wrconn)
1966 0 : ereport(ERROR,
1967 : (errcode(ERRCODE_CONNECTION_FAILURE),
1968 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1969 : sub->name, err)));
1970 :
1971 26 : PG_TRY();
1972 : {
1973 26 : if (retain_dead_tuples)
1974 18 : check_pub_dead_tuple_retention(wrconn);
1975 :
1976 26 : check_publications_origin_tables(wrconn, sub->publications, false,
1977 : retain_dead_tuples, origin, NULL, 0,
1978 : sub->name);
1979 :
1980 26 : if (update_failover || update_two_phase)
1981 10 : walrcv_alter_slot(wrconn, sub->slotname,
1982 : update_failover ? &opts.failover : NULL,
1983 : update_two_phase ? &opts.twophase : NULL);
1984 : }
1985 0 : PG_FINALLY();
1986 : {
1987 26 : walrcv_disconnect(wrconn);
1988 : }
1989 26 : PG_END_TRY();
1990 : }
1991 :
1992 406 : table_close(rel, RowExclusiveLock);
1993 :
1994 406 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1995 :
1996 406 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1997 :
1998 : /* Wake up related replication workers to handle this change quickly. */
1999 406 : LogicalRepWorkersWakeupAtCommit(subid);
2000 :
2001 406 : return myself;
2002 : }
2003 :
2004 : /*
2005 : * Drop a subscription
2006 : */
2007 : void
2008 248 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
2009 : {
2010 : Relation rel;
2011 : ObjectAddress myself;
2012 : HeapTuple tup;
2013 : Oid subid;
2014 : Oid subowner;
2015 : Datum datum;
2016 : bool isnull;
2017 : char *subname;
2018 : char *conninfo;
2019 : char *slotname;
2020 : List *subworkers;
2021 : ListCell *lc;
2022 : char originname[NAMEDATALEN];
2023 248 : char *err = NULL;
2024 : WalReceiverConn *wrconn;
2025 : Form_pg_subscription form;
2026 : List *rstates;
2027 : bool must_use_password;
2028 :
2029 : /*
2030 : * The launcher may concurrently start a new worker for this subscription.
2031 : * During initialization, the worker checks for subscription validity and
2032 : * exits if the subscription has already been dropped. See
2033 : * InitializeLogRepWorker.
2034 : */
2035 248 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2036 :
2037 248 : tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2038 248 : CStringGetDatum(stmt->subname));
2039 :
2040 248 : if (!HeapTupleIsValid(tup))
2041 : {
2042 12 : table_close(rel, NoLock);
2043 :
2044 12 : if (!stmt->missing_ok)
2045 6 : ereport(ERROR,
2046 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2047 : errmsg("subscription \"%s\" does not exist",
2048 : stmt->subname)));
2049 : else
2050 6 : ereport(NOTICE,
2051 : (errmsg("subscription \"%s\" does not exist, skipping",
2052 : stmt->subname)));
2053 :
2054 90 : return;
2055 : }
2056 :
2057 236 : form = (Form_pg_subscription) GETSTRUCT(tup);
2058 236 : subid = form->oid;
2059 236 : subowner = form->subowner;
2060 236 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2061 :
2062 : /* must be owner */
2063 236 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
2064 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
2065 0 : stmt->subname);
2066 :
2067 : /* DROP hook for the subscription being removed */
2068 236 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
2069 :
2070 : /*
2071 : * Lock the subscription so nobody else can do anything with it (including
2072 : * the replication workers).
2073 : */
2074 236 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
2075 :
2076 : /* Get subname */
2077 236 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
2078 : Anum_pg_subscription_subname);
2079 236 : subname = pstrdup(NameStr(*DatumGetName(datum)));
2080 :
2081 : /* Get conninfo */
2082 236 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
2083 : Anum_pg_subscription_subconninfo);
2084 236 : conninfo = TextDatumGetCString(datum);
2085 :
2086 : /* Get slotname */
2087 236 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
2088 : Anum_pg_subscription_subslotname, &isnull);
2089 236 : if (!isnull)
2090 150 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
2091 : else
2092 86 : slotname = NULL;
2093 :
2094 : /*
2095 : * Since dropping a replication slot is not transactional, the replication
2096 : * slot stays dropped even if the transaction rolls back. So we cannot
2097 : * run DROP SUBSCRIPTION inside a transaction block if dropping the
2098 : * replication slot. Also, in this case, we report a message for dropping
2099 : * the subscription to the cumulative stats system.
2100 : *
2101 : * XXX The command name should really be something like "DROP SUBSCRIPTION
2102 : * of a subscription that is associated with a replication slot", but we
2103 : * don't have the proper facilities for that.
2104 : */
2105 236 : if (slotname)
2106 150 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2107 :
2108 230 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
2109 230 : EventTriggerSQLDropAddObject(&myself, true, true);
2110 :
2111 : /* Remove the tuple from catalog. */
2112 230 : CatalogTupleDelete(rel, &tup->t_self);
2113 :
2114 230 : ReleaseSysCache(tup);
2115 :
2116 : /*
2117 : * Stop all the subscription workers immediately.
2118 : *
2119 : * This is necessary if we are dropping the replication slot, so that the
2120 : * slot becomes accessible.
2121 : *
2122 : * It is also necessary if the subscription is disabled and was disabled
2123 : * in the same transaction. Then the workers haven't seen the disabling
2124 : * yet and will still be running, leading to hangs later when we want to
2125 : * drop the replication origin. If the subscription was disabled before
2126 : * this transaction, then there shouldn't be any workers left, so this
2127 : * won't make a difference.
2128 : *
2129 : * New workers won't be started because we hold an exclusive lock on the
2130 : * subscription till the end of the transaction.
2131 : */
2132 230 : subworkers = logicalrep_workers_find(subid, false, true);
2133 382 : foreach(lc, subworkers)
2134 : {
2135 152 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
2136 :
2137 152 : logicalrep_worker_stop(w->type, w->subid, w->relid);
2138 : }
2139 230 : list_free(subworkers);
2140 :
2141 : /*
2142 : * Remove the no-longer-useful entry in the launcher's table of apply
2143 : * worker start times.
2144 : *
2145 : * If this transaction rolls back, the launcher might restart a failed
2146 : * apply worker before wal_retrieve_retry_interval milliseconds have
2147 : * elapsed, but that's pretty harmless.
2148 : */
2149 230 : ApplyLauncherForgetWorkerStartTime(subid);
2150 :
2151 : /*
2152 : * Cleanup of tablesync replication origins.
2153 : *
2154 : * Any READY-state relations would already have dealt with clean-ups.
2155 : *
2156 : * Note that the state can't change because we have already stopped both
2157 : * the apply and tablesync workers and they can't restart because of
2158 : * exclusive lock on the subscription.
2159 : */
2160 230 : rstates = GetSubscriptionRelations(subid, true, false, true);
2161 244 : foreach(lc, rstates)
2162 : {
2163 14 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2164 14 : Oid relid = rstate->relid;
2165 :
2166 : /* Only cleanup resources of tablesync workers */
2167 14 : if (!OidIsValid(relid))
2168 0 : continue;
2169 :
2170 : /*
2171 : * Drop the tablesync's origin tracking if exists.
2172 : *
2173 : * It is possible that the origin is not yet created for tablesync
2174 : * worker so passing missing_ok = true. This can happen for the states
2175 : * before SUBREL_STATE_FINISHEDCOPY.
2176 : */
2177 14 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
2178 : sizeof(originname));
2179 14 : replorigin_drop_by_name(originname, true, false);
2180 : }
2181 :
2182 : /* Clean up dependencies */
2183 230 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
2184 :
2185 : /* Remove any associated relation synchronization states. */
2186 230 : RemoveSubscriptionRel(subid, InvalidOid);
2187 :
2188 : /* Remove the origin tracking if exists. */
2189 230 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
2190 230 : replorigin_drop_by_name(originname, true, false);
2191 :
2192 : /*
2193 : * Tell the cumulative stats system that the subscription is getting
2194 : * dropped.
2195 : */
2196 230 : pgstat_drop_subscription(subid);
2197 :
2198 : /*
2199 : * If there is no slot associated with the subscription, we can finish
2200 : * here.
2201 : */
2202 230 : if (!slotname && rstates == NIL)
2203 : {
2204 84 : table_close(rel, NoLock);
2205 84 : return;
2206 : }
2207 :
2208 : /*
2209 : * Try to acquire the connection necessary for dropping slots.
2210 : *
2211 : * Note: If the slotname is NONE/NULL then we allow the command to finish
2212 : * and users need to manually cleanup the apply and tablesync worker slots
2213 : * later.
2214 : *
2215 : * This has to be at the end because otherwise if there is an error while
2216 : * doing the database operations we won't be able to rollback dropped
2217 : * slot.
2218 : */
2219 146 : load_file("libpqwalreceiver", false);
2220 :
2221 146 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2222 : subname, &err);
2223 146 : if (wrconn == NULL)
2224 : {
2225 0 : if (!slotname)
2226 : {
2227 : /* be tidy */
2228 0 : list_free(rstates);
2229 0 : table_close(rel, NoLock);
2230 0 : return;
2231 : }
2232 : else
2233 : {
2234 0 : ReportSlotConnectionError(rstates, subid, slotname, err);
2235 : }
2236 : }
2237 :
2238 146 : PG_TRY();
2239 : {
2240 160 : foreach(lc, rstates)
2241 : {
2242 14 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2243 14 : Oid relid = rstate->relid;
2244 :
2245 : /* Only cleanup resources of tablesync workers */
2246 14 : if (!OidIsValid(relid))
2247 0 : continue;
2248 :
2249 : /*
2250 : * Drop the tablesync slots associated with removed tables.
2251 : *
2252 : * For SYNCDONE/READY states, the tablesync slot is known to have
2253 : * already been dropped by the tablesync worker.
2254 : *
2255 : * For other states, there is no certainty, maybe the slot does
2256 : * not exist yet. Also, if we fail after removing some of the
2257 : * slots, next time, it will again try to drop already dropped
2258 : * slots and fail. For these reasons, we allow missing_ok = true
2259 : * for the drop.
2260 : */
2261 14 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2262 : {
2263 10 : char syncslotname[NAMEDATALEN] = {0};
2264 :
2265 10 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2266 : sizeof(syncslotname));
2267 10 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
2268 : }
2269 : }
2270 :
2271 146 : list_free(rstates);
2272 :
2273 : /*
2274 : * If there is a slot associated with the subscription, then drop the
2275 : * replication slot at the publisher.
2276 : */
2277 146 : if (slotname)
2278 144 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2279 : }
2280 2 : PG_FINALLY();
2281 : {
2282 146 : walrcv_disconnect(wrconn);
2283 : }
2284 146 : PG_END_TRY();
2285 :
2286 144 : table_close(rel, NoLock);
2287 : }
2288 :
2289 : /*
2290 : * Drop the replication slot at the publisher node using the replication
2291 : * connection.
2292 : *
2293 : * missing_ok - if true then only issue a LOG message if the slot doesn't
2294 : * exist.
2295 : */
2296 : void
2297 536 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2298 : {
2299 : StringInfoData cmd;
2300 :
2301 : Assert(wrconn);
2302 :
2303 536 : load_file("libpqwalreceiver", false);
2304 :
2305 536 : initStringInfo(&cmd);
2306 536 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2307 :
2308 536 : PG_TRY();
2309 : {
2310 : WalRcvExecResult *res;
2311 :
2312 536 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2313 :
2314 536 : if (res->status == WALRCV_OK_COMMAND)
2315 : {
2316 : /* NOTICE. Success. */
2317 532 : ereport(NOTICE,
2318 : (errmsg("dropped replication slot \"%s\" on publisher",
2319 : slotname)));
2320 : }
2321 4 : else if (res->status == WALRCV_ERROR &&
2322 2 : missing_ok &&
2323 2 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
2324 : {
2325 : /* LOG. Error, but missing_ok = true. */
2326 2 : ereport(LOG,
2327 : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2328 : slotname, res->err)));
2329 : }
2330 : else
2331 : {
2332 : /* ERROR. */
2333 2 : ereport(ERROR,
2334 : (errcode(ERRCODE_CONNECTION_FAILURE),
2335 : errmsg("could not drop replication slot \"%s\" on publisher: %s",
2336 : slotname, res->err)));
2337 : }
2338 :
2339 534 : walrcv_clear_result(res);
2340 : }
2341 2 : PG_FINALLY();
2342 : {
2343 536 : pfree(cmd.data);
2344 : }
2345 536 : PG_END_TRY();
2346 534 : }
2347 :
2348 : /*
2349 : * Internal workhorse for changing a subscription owner
2350 : */
2351 : static void
2352 18 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2353 : {
2354 : Form_pg_subscription form;
2355 : AclResult aclresult;
2356 :
2357 18 : form = (Form_pg_subscription) GETSTRUCT(tup);
2358 :
2359 18 : if (form->subowner == newOwnerId)
2360 4 : return;
2361 :
2362 14 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
2363 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
2364 0 : NameStr(form->subname));
2365 :
2366 : /*
2367 : * Don't allow non-superuser modification of a subscription with
2368 : * password_required=false.
2369 : */
2370 14 : if (!form->subpasswordrequired && !superuser())
2371 0 : ereport(ERROR,
2372 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2373 : errmsg("password_required=false is superuser-only"),
2374 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2375 :
2376 : /* Must be able to become new owner */
2377 14 : check_can_set_role(GetUserId(), newOwnerId);
2378 :
2379 : /*
2380 : * current owner must have CREATE on database
2381 : *
2382 : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2383 : * other object types behave differently (e.g. you can't give a table to a
2384 : * user who lacks CREATE privileges on a schema).
2385 : */
2386 8 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
2387 : GetUserId(), ACL_CREATE);
2388 8 : if (aclresult != ACLCHECK_OK)
2389 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2390 0 : get_database_name(MyDatabaseId));
2391 :
2392 8 : form->subowner = newOwnerId;
2393 8 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2394 :
2395 : /* Update owner dependency reference */
2396 8 : changeDependencyOnOwner(SubscriptionRelationId,
2397 : form->oid,
2398 : newOwnerId);
2399 :
2400 8 : InvokeObjectPostAlterHook(SubscriptionRelationId,
2401 : form->oid, 0);
2402 :
2403 : /* Wake up related background processes to handle this change quickly. */
2404 8 : ApplyLauncherWakeupAtCommit();
2405 8 : LogicalRepWorkersWakeupAtCommit(form->oid);
2406 : }
2407 :
2408 : /*
2409 : * Change subscription owner -- by name
2410 : */
2411 : ObjectAddress
2412 18 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
2413 : {
2414 : Oid subid;
2415 : HeapTuple tup;
2416 : Relation rel;
2417 : ObjectAddress address;
2418 : Form_pg_subscription form;
2419 :
2420 18 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2421 :
2422 18 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2423 : CStringGetDatum(name));
2424 :
2425 18 : if (!HeapTupleIsValid(tup))
2426 0 : ereport(ERROR,
2427 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2428 : errmsg("subscription \"%s\" does not exist", name)));
2429 :
2430 18 : form = (Form_pg_subscription) GETSTRUCT(tup);
2431 18 : subid = form->oid;
2432 :
2433 18 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2434 :
2435 12 : ObjectAddressSet(address, SubscriptionRelationId, subid);
2436 :
2437 12 : heap_freetuple(tup);
2438 :
2439 12 : table_close(rel, RowExclusiveLock);
2440 :
2441 12 : return address;
2442 : }
2443 :
2444 : /*
2445 : * Change subscription owner -- by OID
2446 : */
2447 : void
2448 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
2449 : {
2450 : HeapTuple tup;
2451 : Relation rel;
2452 :
2453 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2454 :
2455 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
2456 :
2457 0 : if (!HeapTupleIsValid(tup))
2458 0 : ereport(ERROR,
2459 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2460 : errmsg("subscription with OID %u does not exist", subid)));
2461 :
2462 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2463 :
2464 0 : heap_freetuple(tup);
2465 :
2466 0 : table_close(rel, RowExclusiveLock);
2467 0 : }
2468 :
2469 : /*
2470 : * Check and log a warning if the publisher has subscribed to the same table,
2471 : * its partition ancestors (if it's a partition), or its partition children (if
2472 : * it's a partitioned table), from some other publishers. This check is
2473 : * required in the following scenarios:
2474 : *
2475 : * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2476 : * statements with "copy_data = true" and "origin = none":
2477 : * - Warn the user that data with an origin might have been copied.
2478 : * - This check is skipped for tables already added, as incremental sync via
2479 : * WAL allows origin tracking. The list of such tables is in
2480 : * subrel_local_oids.
2481 : *
2482 : * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2483 : * statements with "retain_dead_tuples = true" and "origin = any", and for
2484 : * ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
2485 : * or when the publisher's status changes (e.g., due to a connection string
2486 : * update):
2487 : * - Warn the user that only conflict detection info for local changes on
2488 : * the publisher is retained. Data from other origins may lack sufficient
2489 : * details for reliable conflict detection.
2490 : * - See comments atop worker.c for more details.
2491 : */
2492 : static void
2493 332 : check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
2494 : bool copydata, bool retain_dead_tuples,
2495 : char *origin, Oid *subrel_local_oids,
2496 : int subrel_count, char *subname)
2497 : {
2498 : WalRcvExecResult *res;
2499 : StringInfoData cmd;
2500 : TupleTableSlot *slot;
2501 332 : Oid tableRow[1] = {TEXTOID};
2502 332 : List *publist = NIL;
2503 : int i;
2504 : bool check_rdt;
2505 : bool check_table_sync;
2506 664 : bool origin_none = origin &&
2507 332 : pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
2508 :
2509 : /*
2510 : * Enable retain_dead_tuples checks only when origin is set to 'any',
2511 : * since with origin='none' only local changes are replicated to the
2512 : * subscriber.
2513 : */
2514 332 : check_rdt = retain_dead_tuples && !origin_none;
2515 :
2516 : /*
2517 : * Enable table synchronization checks only when origin is 'none', to
2518 : * ensure that data from other origins is not inadvertently copied.
2519 : */
2520 332 : check_table_sync = copydata && origin_none;
2521 :
2522 : /* retain_dead_tuples and table sync checks occur separately */
2523 : Assert(!(check_rdt && check_table_sync));
2524 :
2525 : /* Return if no checks are required */
2526 332 : if (!check_rdt && !check_table_sync)
2527 304 : return;
2528 :
2529 28 : initStringInfo(&cmd);
2530 28 : appendStringInfoString(&cmd,
2531 : "SELECT DISTINCT P.pubname AS pubname\n"
2532 : "FROM pg_publication P,\n"
2533 : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2534 : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2535 : " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2536 : " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2537 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2538 : "WHERE C.oid = GPT.relid AND P.pubname IN (");
2539 28 : GetPublicationsStr(publications, &cmd, true);
2540 28 : appendStringInfoString(&cmd, ")\n");
2541 :
2542 : /*
2543 : * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2544 : * subrel_local_oids contains the list of relation oids that are already
2545 : * present on the subscriber. This check should be skipped for these
2546 : * tables if checking for table sync scenario. However, when handling the
2547 : * retain_dead_tuples scenario, ensure all tables are checked, as some
2548 : * existing tables may now include changes from other origins due to newly
2549 : * created subscriptions on the publisher.
2550 : */
2551 28 : if (check_table_sync)
2552 : {
2553 28 : for (i = 0; i < subrel_count; i++)
2554 : {
2555 8 : Oid relid = subrel_local_oids[i];
2556 8 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
2557 8 : char *tablename = get_rel_name(relid);
2558 :
2559 8 : appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2560 : schemaname, tablename);
2561 : }
2562 : }
2563 :
2564 28 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2565 28 : pfree(cmd.data);
2566 :
2567 28 : if (res->status != WALRCV_OK_TUPLES)
2568 0 : ereport(ERROR,
2569 : (errcode(ERRCODE_CONNECTION_FAILURE),
2570 : errmsg("could not receive list of replicated tables from the publisher: %s",
2571 : res->err)));
2572 :
2573 : /* Process publications. */
2574 28 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2575 38 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2576 : {
2577 : char *pubname;
2578 : bool isnull;
2579 :
2580 10 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2581 : Assert(!isnull);
2582 :
2583 10 : ExecClearTuple(slot);
2584 10 : publist = list_append_unique(publist, makeString(pubname));
2585 : }
2586 :
2587 : /*
2588 : * Log a warning if the publisher has subscribed to the same table from
2589 : * some other publisher. We cannot know the origin of data during the
2590 : * initial sync. Data origins can be found only from the WAL by looking at
2591 : * the origin id.
2592 : *
2593 : * XXX: For simplicity, we don't check whether the table has any data or
2594 : * not. If the table doesn't have any data then we don't need to
2595 : * distinguish between data having origin and data not having origin so we
2596 : * can avoid logging a warning for table sync scenario.
2597 : */
2598 28 : if (publist)
2599 : {
2600 10 : StringInfo pubnames = makeStringInfo();
2601 10 : StringInfo err_msg = makeStringInfo();
2602 10 : StringInfo err_hint = makeStringInfo();
2603 :
2604 : /* Prepare the list of publication(s) for warning message. */
2605 10 : GetPublicationsStr(publist, pubnames, false);
2606 :
2607 10 : if (check_table_sync)
2608 : {
2609 8 : appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
2610 : subname);
2611 8 : appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
2612 : }
2613 : else
2614 : {
2615 2 : appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
2616 : subname);
2617 2 : appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
2618 : }
2619 :
2620 10 : ereport(WARNING,
2621 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2622 : errmsg_internal("%s", err_msg->data),
2623 : errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2624 : "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2625 : list_length(publist), pubnames->data),
2626 : errhint_internal("%s", err_hint->data));
2627 : }
2628 :
2629 28 : ExecDropSingleTupleTableSlot(slot);
2630 :
2631 28 : walrcv_clear_result(res);
2632 : }
2633 :
2634 : /*
2635 : * This function is similar to check_publications_origin_tables and serves
2636 : * same purpose for sequences.
2637 : */
2638 : static void
2639 306 : check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
2640 : bool copydata, char *origin,
2641 : Oid *subrel_local_oids, int subrel_count,
2642 : char *subname)
2643 : {
2644 : WalRcvExecResult *res;
2645 : StringInfoData cmd;
2646 : TupleTableSlot *slot;
2647 306 : Oid tableRow[1] = {TEXTOID};
2648 306 : List *publist = NIL;
2649 :
2650 : /*
2651 : * Enable sequence synchronization checks only when origin is 'none' , to
2652 : * ensure that sequence data from other origins is not inadvertently
2653 : * copied.
2654 : */
2655 306 : if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
2656 286 : return;
2657 :
2658 20 : initStringInfo(&cmd);
2659 20 : appendStringInfoString(&cmd,
2660 : "SELECT DISTINCT P.pubname AS pubname\n"
2661 : "FROM pg_publication P,\n"
2662 : " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2663 : " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2664 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2665 : "WHERE C.oid = GPS.relid AND P.pubname IN (");
2666 :
2667 20 : GetPublicationsStr(publications, &cmd, true);
2668 20 : appendStringInfoString(&cmd, ")\n");
2669 :
2670 : /*
2671 : * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2672 : * subrel_local_oids contains the list of relations that are already
2673 : * present on the subscriber. This check should be skipped as these will
2674 : * not be re-synced.
2675 : */
2676 20 : for (int i = 0; i < subrel_count; i++)
2677 : {
2678 0 : Oid relid = subrel_local_oids[i];
2679 0 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
2680 0 : char *seqname = get_rel_name(relid);
2681 :
2682 0 : appendStringInfo(&cmd,
2683 : "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2684 : schemaname, seqname);
2685 : }
2686 :
2687 20 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2688 20 : pfree(cmd.data);
2689 :
2690 20 : if (res->status != WALRCV_OK_TUPLES)
2691 0 : ereport(ERROR,
2692 : (errcode(ERRCODE_CONNECTION_FAILURE),
2693 : errmsg("could not receive list of replicated sequences from the publisher: %s",
2694 : res->err)));
2695 :
2696 : /* Process publications. */
2697 20 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2698 20 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2699 : {
2700 : char *pubname;
2701 : bool isnull;
2702 :
2703 0 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2704 : Assert(!isnull);
2705 :
2706 0 : ExecClearTuple(slot);
2707 0 : publist = list_append_unique(publist, makeString(pubname));
2708 : }
2709 :
2710 : /*
2711 : * Log a warning if the publisher has subscribed to the same sequence from
2712 : * some other publisher. We cannot know the origin of sequences data
2713 : * during the initial sync.
2714 : */
2715 20 : if (publist)
2716 : {
2717 0 : StringInfo pubnames = makeStringInfo();
2718 0 : StringInfo err_msg = makeStringInfo();
2719 0 : StringInfo err_hint = makeStringInfo();
2720 :
2721 : /* Prepare the list of publication(s) for warning message. */
2722 0 : GetPublicationsStr(publist, pubnames, false);
2723 :
2724 0 : appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
2725 : subname);
2726 0 : appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins."));
2727 :
2728 0 : ereport(WARNING,
2729 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2730 : errmsg_internal("%s", err_msg->data),
2731 : errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
2732 : "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
2733 : list_length(publist), pubnames->data),
2734 : errhint_internal("%s", err_hint->data));
2735 : }
2736 :
2737 20 : ExecDropSingleTupleTableSlot(slot);
2738 :
2739 20 : walrcv_clear_result(res);
2740 : }
2741 :
2742 : /*
2743 : * Determine whether the retain_dead_tuples can be enabled based on the
2744 : * publisher's status.
2745 : *
2746 : * This option is disallowed if the publisher is running a version earlier
2747 : * than the PG19, or if the publisher is in recovery (i.e., it is a standby
2748 : * server).
2749 : *
2750 : * See comments atop worker.c for a detailed explanation.
2751 : */
2752 : static void
2753 24 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
2754 : {
2755 : WalRcvExecResult *res;
2756 24 : Oid RecoveryRow[1] = {BOOLOID};
2757 : TupleTableSlot *slot;
2758 : bool isnull;
2759 : bool remote_in_recovery;
2760 :
2761 24 : if (walrcv_server_version(wrconn) < 19000)
2762 0 : ereport(ERROR,
2763 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2764 : errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
2765 :
2766 24 : res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
2767 :
2768 24 : if (res->status != WALRCV_OK_TUPLES)
2769 0 : ereport(ERROR,
2770 : (errcode(ERRCODE_CONNECTION_FAILURE),
2771 : errmsg("could not obtain recovery progress from the publisher: %s",
2772 : res->err)));
2773 :
2774 24 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2775 24 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2776 0 : elog(ERROR, "failed to fetch tuple for the recovery progress");
2777 :
2778 24 : remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
2779 :
2780 24 : if (remote_in_recovery)
2781 0 : ereport(ERROR,
2782 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2783 : errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
2784 :
2785 24 : ExecDropSingleTupleTableSlot(slot);
2786 :
2787 24 : walrcv_clear_result(res);
2788 24 : }
2789 :
2790 : /*
2791 : * Check if the subscriber's configuration is adequate to enable the
2792 : * retain_dead_tuples option.
2793 : *
2794 : * Issue an ERROR if the wal_level does not support the use of replication
2795 : * slots when check_guc is set to true.
2796 : *
2797 : * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
2798 : * set to true. This is only to highlight the importance of enabling
2799 : * track_commit_timestamp instead of catching all the misconfigurations, as
2800 : * this setting can be adjusted after subscription creation. Without it, the
2801 : * apply worker will simply skip conflict detection.
2802 : *
2803 : * Issue a WARNING or NOTICE if the subscription is disabled and the retention
2804 : * is active. Do not raise an ERROR since users can only modify
2805 : * retain_dead_tuples for disabled subscriptions. And as long as the
2806 : * subscription is enabled promptly, it will not pose issues.
2807 : *
2808 : * Issue a NOTICE to inform users that max_retention_duration is
2809 : * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
2810 : * is not issued because setting max_retention_duration causes no harm,
2811 : * even when it is ineffective.
2812 : */
2813 : void
2814 480 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
2815 : int elevel_for_sub_disabled,
2816 : bool retain_dead_tuples, bool retention_active,
2817 : bool max_retention_set)
2818 : {
2819 : Assert(elevel_for_sub_disabled == NOTICE ||
2820 : elevel_for_sub_disabled == WARNING);
2821 :
2822 480 : if (retain_dead_tuples)
2823 : {
2824 34 : if (check_guc && wal_level < WAL_LEVEL_REPLICA)
2825 0 : ereport(ERROR,
2826 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2827 : errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
2828 : errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
2829 :
2830 34 : if (check_guc && !track_commit_timestamp)
2831 8 : ereport(WARNING,
2832 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2833 : errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
2834 : errhint("Consider setting \"%s\" to true.",
2835 : "track_commit_timestamp"));
2836 :
2837 34 : if (sub_disabled && retention_active)
2838 14 : ereport(elevel_for_sub_disabled,
2839 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2840 : errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
2841 : (elevel_for_sub_disabled > NOTICE)
2842 : ? errhint("Consider setting %s to false.",
2843 : "retain_dead_tuples") : 0);
2844 : }
2845 446 : else if (max_retention_set)
2846 : {
2847 6 : ereport(NOTICE,
2848 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2849 : errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
2850 : }
2851 480 : }
2852 :
2853 : /*
2854 : * Return true iff 'rv' is a member of the list.
2855 : */
2856 : static bool
2857 544 : list_member_rangevar(const List *list, RangeVar *rv)
2858 : {
2859 2000 : foreach_ptr(PublicationRelKind, relinfo, list)
2860 : {
2861 916 : if (equal(relinfo->rv, rv))
2862 2 : return true;
2863 : }
2864 :
2865 542 : return false;
2866 : }
2867 :
2868 : /*
2869 : * Get the list of tables and sequences which belong to specified publications
2870 : * on the publisher connection.
2871 : *
2872 : * Note that we don't support the case where the column list is different for
2873 : * the same table in different publications to avoid sending unwanted column
2874 : * information for some of the rows. This can happen when both the column
2875 : * list and row filter are specified for different publications.
2876 : */
2877 : static List *
2878 306 : fetch_relation_list(WalReceiverConn *wrconn, List *publications)
2879 : {
2880 : WalRcvExecResult *res;
2881 : StringInfoData cmd;
2882 : TupleTableSlot *slot;
2883 306 : Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
2884 306 : List *relationlist = NIL;
2885 306 : int server_version = walrcv_server_version(wrconn);
2886 306 : bool check_columnlist = (server_version >= 150000);
2887 306 : int column_count = check_columnlist ? 4 : 3;
2888 306 : StringInfo pub_names = makeStringInfo();
2889 :
2890 306 : initStringInfo(&cmd);
2891 :
2892 : /* Build the pub_names comma-separated string. */
2893 306 : GetPublicationsStr(publications, pub_names, true);
2894 :
2895 : /* Get the list of relations from the publisher */
2896 306 : if (server_version >= 160000)
2897 : {
2898 306 : tableRow[3] = INT2VECTOROID;
2899 :
2900 : /*
2901 : * From version 16, we allowed passing multiple publications to the
2902 : * function pg_get_publication_tables. This helped to filter out the
2903 : * partition table whose ancestor is also published in this
2904 : * publication array.
2905 : *
2906 : * Join pg_get_publication_tables with pg_publication to exclude
2907 : * non-existing publications.
2908 : *
2909 : * Note that attrs are always stored in sorted order so we don't need
2910 : * to worry if different publications have specified them in a
2911 : * different order. See pub_collist_validate.
2912 : */
2913 306 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
2914 : " FROM pg_class c\n"
2915 : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2916 : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2917 : " FROM pg_publication\n"
2918 : " WHERE pubname IN ( %s )) AS gpt\n"
2919 : " ON gpt.relid = c.oid\n",
2920 : pub_names->data);
2921 :
2922 : /* From version 19, inclusion of sequences in the target is supported */
2923 306 : if (server_version >= 190000)
2924 306 : appendStringInfo(&cmd,
2925 : "UNION ALL\n"
2926 : " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
2927 : " FROM pg_catalog.pg_publication_sequences s\n"
2928 : " WHERE s.pubname IN ( %s )",
2929 : pub_names->data);
2930 : }
2931 : else
2932 : {
2933 0 : tableRow[3] = NAMEARRAYOID;
2934 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
2935 :
2936 : /* Get column lists for each relation if the publisher supports it */
2937 0 : if (check_columnlist)
2938 0 : appendStringInfoString(&cmd, ", t.attnames\n");
2939 :
2940 0 : appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2941 : " WHERE t.pubname IN ( %s )",
2942 : pub_names->data);
2943 : }
2944 :
2945 306 : destroyStringInfo(pub_names);
2946 :
2947 306 : res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
2948 306 : pfree(cmd.data);
2949 :
2950 306 : if (res->status != WALRCV_OK_TUPLES)
2951 0 : ereport(ERROR,
2952 : (errcode(ERRCODE_CONNECTION_FAILURE),
2953 : errmsg("could not receive list of replicated tables from the publisher: %s",
2954 : res->err)));
2955 :
2956 : /* Process tables. */
2957 306 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2958 850 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2959 : {
2960 : char *nspname;
2961 : char *relname;
2962 : bool isnull;
2963 : char relkind;
2964 546 : PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
2965 :
2966 546 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2967 : Assert(!isnull);
2968 546 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2969 : Assert(!isnull);
2970 546 : relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
2971 : Assert(!isnull);
2972 :
2973 546 : relinfo->rv = makeRangeVar(nspname, relname, -1);
2974 546 : relinfo->relkind = relkind;
2975 :
2976 546 : if (relkind != RELKIND_SEQUENCE &&
2977 544 : check_columnlist &&
2978 544 : list_member_rangevar(relationlist, relinfo->rv))
2979 2 : ereport(ERROR,
2980 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2981 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2982 : nspname, relname));
2983 : else
2984 544 : relationlist = lappend(relationlist, relinfo);
2985 :
2986 544 : ExecClearTuple(slot);
2987 : }
2988 304 : ExecDropSingleTupleTableSlot(slot);
2989 :
2990 304 : walrcv_clear_result(res);
2991 :
2992 304 : return relationlist;
2993 : }
2994 :
2995 : /*
2996 : * This is to report the connection failure while dropping replication slots.
2997 : * Here, we report the WARNING for all tablesync slots so that user can drop
2998 : * them manually, if required.
2999 : */
3000 : static void
3001 0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
3002 : {
3003 : ListCell *lc;
3004 :
3005 0 : foreach(lc, rstates)
3006 : {
3007 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
3008 0 : Oid relid = rstate->relid;
3009 :
3010 : /* Only cleanup resources of tablesync workers */
3011 0 : if (!OidIsValid(relid))
3012 0 : continue;
3013 :
3014 : /*
3015 : * Caller needs to ensure that relstate doesn't change underneath us.
3016 : * See DropSubscription where we get the relstates.
3017 : */
3018 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
3019 : {
3020 0 : char syncslotname[NAMEDATALEN] = {0};
3021 :
3022 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
3023 : sizeof(syncslotname));
3024 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
3025 : syncslotname);
3026 : }
3027 : }
3028 :
3029 0 : ereport(ERROR,
3030 : (errcode(ERRCODE_CONNECTION_FAILURE),
3031 : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3032 : slotname, err),
3033 : /* translator: %s is an SQL ALTER command */
3034 : errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3035 : "ALTER SUBSCRIPTION ... DISABLE",
3036 : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3037 : }
3038 :
3039 : /*
3040 : * Check for duplicates in the given list of publications and error out if
3041 : * found one. Add publications to datums as text datums, if datums is not
3042 : * NULL.
3043 : */
3044 : static void
3045 448 : check_duplicates_in_publist(List *publist, Datum *datums)
3046 : {
3047 : ListCell *cell;
3048 448 : int j = 0;
3049 :
3050 1020 : foreach(cell, publist)
3051 : {
3052 590 : char *name = strVal(lfirst(cell));
3053 : ListCell *pcell;
3054 :
3055 888 : foreach(pcell, publist)
3056 : {
3057 888 : char *pname = strVal(lfirst(pcell));
3058 :
3059 888 : if (pcell == cell)
3060 572 : break;
3061 :
3062 316 : if (strcmp(name, pname) == 0)
3063 18 : ereport(ERROR,
3064 : (errcode(ERRCODE_DUPLICATE_OBJECT),
3065 : errmsg("publication name \"%s\" used more than once",
3066 : pname)));
3067 : }
3068 :
3069 572 : if (datums)
3070 486 : datums[j++] = CStringGetTextDatum(name);
3071 : }
3072 430 : }
3073 :
3074 : /*
3075 : * Merge current subscription's publications and user-specified publications
3076 : * from ADD/DROP PUBLICATIONS.
3077 : *
3078 : * If addpub is true, we will add the list of publications into oldpublist.
3079 : * Otherwise, we will delete the list of publications from oldpublist. The
3080 : * returned list is a copy, oldpublist itself is not changed.
3081 : *
3082 : * subname is the subscription name, for error messages.
3083 : */
3084 : static List *
3085 54 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
3086 : {
3087 : ListCell *lc;
3088 :
3089 54 : oldpublist = list_copy(oldpublist);
3090 :
3091 54 : check_duplicates_in_publist(newpublist, NULL);
3092 :
3093 92 : foreach(lc, newpublist)
3094 : {
3095 68 : char *name = strVal(lfirst(lc));
3096 : ListCell *lc2;
3097 68 : bool found = false;
3098 :
3099 134 : foreach(lc2, oldpublist)
3100 : {
3101 110 : char *pubname = strVal(lfirst(lc2));
3102 :
3103 110 : if (strcmp(name, pubname) == 0)
3104 : {
3105 44 : found = true;
3106 44 : if (addpub)
3107 12 : ereport(ERROR,
3108 : (errcode(ERRCODE_DUPLICATE_OBJECT),
3109 : errmsg("publication \"%s\" is already in subscription \"%s\"",
3110 : name, subname)));
3111 : else
3112 32 : oldpublist = foreach_delete_current(oldpublist, lc2);
3113 :
3114 32 : break;
3115 : }
3116 : }
3117 :
3118 56 : if (addpub && !found)
3119 18 : oldpublist = lappend(oldpublist, makeString(name));
3120 38 : else if (!addpub && !found)
3121 6 : ereport(ERROR,
3122 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3123 : errmsg("publication \"%s\" is not in subscription \"%s\"",
3124 : name, subname)));
3125 : }
3126 :
3127 : /*
3128 : * XXX Probably no strong reason for this, but for now it's to make ALTER
3129 : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
3130 : */
3131 24 : if (!oldpublist)
3132 6 : ereport(ERROR,
3133 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3134 : errmsg("cannot drop all the publications from a subscription")));
3135 :
3136 18 : return oldpublist;
3137 : }
3138 :
3139 : /*
3140 : * Extract the streaming mode value from a DefElem. This is like
3141 : * defGetBoolean() but also accepts the special value of "parallel".
3142 : */
3143 : char
3144 832 : defGetStreamingMode(DefElem *def)
3145 : {
3146 : /*
3147 : * If no parameter value given, assume "true" is meant.
3148 : */
3149 832 : if (!def->arg)
3150 0 : return LOGICALREP_STREAM_ON;
3151 :
3152 : /*
3153 : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3154 : */
3155 832 : switch (nodeTag(def->arg))
3156 : {
3157 0 : case T_Integer:
3158 0 : switch (intVal(def->arg))
3159 : {
3160 0 : case 0:
3161 0 : return LOGICALREP_STREAM_OFF;
3162 0 : case 1:
3163 0 : return LOGICALREP_STREAM_ON;
3164 0 : default:
3165 : /* otherwise, error out below */
3166 0 : break;
3167 : }
3168 0 : break;
3169 832 : default:
3170 : {
3171 832 : char *sval = defGetString(def);
3172 :
3173 : /*
3174 : * The set of strings accepted here should match up with the
3175 : * grammar's opt_boolean_or_string production.
3176 : */
3177 1658 : if (pg_strcasecmp(sval, "false") == 0 ||
3178 826 : pg_strcasecmp(sval, "off") == 0)
3179 12 : return LOGICALREP_STREAM_OFF;
3180 1622 : if (pg_strcasecmp(sval, "true") == 0 ||
3181 802 : pg_strcasecmp(sval, "on") == 0)
3182 92 : return LOGICALREP_STREAM_ON;
3183 728 : if (pg_strcasecmp(sval, "parallel") == 0)
3184 722 : return LOGICALREP_STREAM_PARALLEL;
3185 : }
3186 6 : break;
3187 : }
3188 :
3189 6 : ereport(ERROR,
3190 : (errcode(ERRCODE_SYNTAX_ERROR),
3191 : errmsg("%s requires a Boolean value or \"parallel\"",
3192 : def->defname)));
3193 : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3194 : }
|