Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * subscriptioncmds.c
4 : * subscription catalog manipulation functions
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/subscriptioncmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/twophase.h"
20 : #include "access/xact.h"
21 : #include "catalog/catalog.h"
22 : #include "catalog/dependency.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/objectaccess.h"
26 : #include "catalog/objectaddress.h"
27 : #include "catalog/pg_authid_d.h"
28 : #include "catalog/pg_database_d.h"
29 : #include "catalog/pg_subscription.h"
30 : #include "catalog/pg_subscription_rel.h"
31 : #include "catalog/pg_type.h"
32 : #include "commands/dbcommands.h"
33 : #include "commands/defrem.h"
34 : #include "commands/event_trigger.h"
35 : #include "commands/subscriptioncmds.h"
36 : #include "executor/executor.h"
37 : #include "miscadmin.h"
38 : #include "nodes/makefuncs.h"
39 : #include "pgstat.h"
40 : #include "replication/logicallauncher.h"
41 : #include "replication/logicalworker.h"
42 : #include "replication/origin.h"
43 : #include "replication/slot.h"
44 : #include "replication/walreceiver.h"
45 : #include "replication/walsender.h"
46 : #include "replication/worker_internal.h"
47 : #include "storage/lmgr.h"
48 : #include "utils/acl.h"
49 : #include "utils/builtins.h"
50 : #include "utils/guc.h"
51 : #include "utils/lsyscache.h"
52 : #include "utils/memutils.h"
53 : #include "utils/pg_lsn.h"
54 : #include "utils/syscache.h"
55 :
56 : /*
57 : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
58 : * command.
59 : */
60 : #define SUBOPT_CONNECT 0x00000001
61 : #define SUBOPT_ENABLED 0x00000002
62 : #define SUBOPT_CREATE_SLOT 0x00000004
63 : #define SUBOPT_SLOT_NAME 0x00000008
64 : #define SUBOPT_COPY_DATA 0x00000010
65 : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
66 : #define SUBOPT_REFRESH 0x00000040
67 : #define SUBOPT_BINARY 0x00000080
68 : #define SUBOPT_STREAMING 0x00000100
69 : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
70 : #define SUBOPT_DISABLE_ON_ERR 0x00000400
71 : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
72 : #define SUBOPT_RUN_AS_OWNER 0x00001000
73 : #define SUBOPT_FAILOVER 0x00002000
74 : #define SUBOPT_LSN 0x00004000
75 : #define SUBOPT_ORIGIN 0x00008000
76 :
77 : /* check if the 'val' has 'bits' set */
78 : #define IsSet(val, bits) (((val) & (bits)) == (bits))
79 :
80 : /*
81 : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
82 : * SUBSCRIPTION command options and the parsed/default values of each of them.
83 : */
84 : typedef struct SubOpts
85 : {
86 : bits32 specified_opts;
87 : char *slot_name;
88 : char *synchronous_commit;
89 : bool connect;
90 : bool enabled;
91 : bool create_slot;
92 : bool copy_data;
93 : bool refresh;
94 : bool binary;
95 : char streaming;
96 : bool twophase;
97 : bool disableonerr;
98 : bool passwordrequired;
99 : bool runasowner;
100 : bool failover;
101 : char *origin;
102 : XLogRecPtr lsn;
103 : } SubOpts;
104 :
105 : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
106 : static void check_publications_origin(WalReceiverConn *wrconn,
107 : List *publications, bool copydata,
108 : char *origin, Oid *subrel_local_oids,
109 : int subrel_count, char *subname);
110 : static void check_duplicates_in_publist(List *publist, Datum *datums);
111 : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
112 : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
113 : static void CheckAlterSubOption(Subscription *sub, const char *option,
114 : bool slot_needs_update, bool isTopLevel);
115 :
116 :
117 : /*
118 : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
119 : *
120 : * Since not all options can be specified in both commands, this function
121 : * will report an error if mutually exclusive options are specified.
122 : */
123 : static void
124 854 : parse_subscription_options(ParseState *pstate, List *stmt_options,
125 : bits32 supported_opts, SubOpts *opts)
126 : {
127 : ListCell *lc;
128 :
129 : /* Start out with cleared opts. */
130 854 : memset(opts, 0, sizeof(SubOpts));
131 :
132 : /* caller must expect some option */
133 : Assert(supported_opts != 0);
134 :
135 : /* If connect option is supported, these others also need to be. */
136 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
137 : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
138 : SUBOPT_COPY_DATA));
139 :
140 : /* Set default values for the supported options. */
141 854 : if (IsSet(supported_opts, SUBOPT_CONNECT))
142 436 : opts->connect = true;
143 854 : if (IsSet(supported_opts, SUBOPT_ENABLED))
144 516 : opts->enabled = true;
145 854 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
146 436 : opts->create_slot = true;
147 854 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
148 566 : opts->copy_data = true;
149 854 : if (IsSet(supported_opts, SUBOPT_REFRESH))
150 82 : opts->refresh = true;
151 854 : if (IsSet(supported_opts, SUBOPT_BINARY))
152 620 : opts->binary = false;
153 854 : if (IsSet(supported_opts, SUBOPT_STREAMING))
154 620 : opts->streaming = LOGICALREP_STREAM_PARALLEL;
155 854 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
156 620 : opts->twophase = false;
157 854 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
158 620 : opts->disableonerr = false;
159 854 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
160 620 : opts->passwordrequired = true;
161 854 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
162 620 : opts->runasowner = false;
163 854 : if (IsSet(supported_opts, SUBOPT_FAILOVER))
164 620 : opts->failover = false;
165 854 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
166 620 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
167 :
168 : /* Parse options */
169 1678 : foreach(lc, stmt_options)
170 : {
171 878 : DefElem *defel = (DefElem *) lfirst(lc);
172 :
173 878 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
174 514 : strcmp(defel->defname, "connect") == 0)
175 : {
176 164 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
177 0 : errorConflictingDefElem(defel, pstate);
178 :
179 164 : opts->specified_opts |= SUBOPT_CONNECT;
180 164 : opts->connect = defGetBoolean(defel);
181 : }
182 714 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
183 430 : strcmp(defel->defname, "enabled") == 0)
184 : {
185 116 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
186 0 : errorConflictingDefElem(defel, pstate);
187 :
188 116 : opts->specified_opts |= SUBOPT_ENABLED;
189 116 : opts->enabled = defGetBoolean(defel);
190 : }
191 598 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
192 314 : strcmp(defel->defname, "create_slot") == 0)
193 : {
194 40 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
195 0 : errorConflictingDefElem(defel, pstate);
196 :
197 40 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
198 40 : opts->create_slot = defGetBoolean(defel);
199 : }
200 558 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
201 462 : strcmp(defel->defname, "slot_name") == 0)
202 : {
203 144 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
204 0 : errorConflictingDefElem(defel, pstate);
205 :
206 144 : opts->specified_opts |= SUBOPT_SLOT_NAME;
207 144 : opts->slot_name = defGetString(defel);
208 :
209 : /* Setting slot_name = NONE is treated as no slot name. */
210 144 : if (strcmp(opts->slot_name, "none") == 0)
211 110 : opts->slot_name = NULL;
212 : else
213 34 : ReplicationSlotValidateName(opts->slot_name, ERROR);
214 : }
215 414 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
216 268 : strcmp(defel->defname, "copy_data") == 0)
217 : {
218 48 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
219 0 : errorConflictingDefElem(defel, pstate);
220 :
221 48 : opts->specified_opts |= SUBOPT_COPY_DATA;
222 48 : opts->copy_data = defGetBoolean(defel);
223 : }
224 366 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
225 276 : strcmp(defel->defname, "synchronous_commit") == 0)
226 : {
227 12 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
228 0 : errorConflictingDefElem(defel, pstate);
229 :
230 12 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
231 12 : opts->synchronous_commit = defGetString(defel);
232 :
233 : /* Test if the given value is valid for synchronous_commit GUC. */
234 12 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
235 : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
236 : false, 0, false);
237 : }
238 354 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
239 66 : strcmp(defel->defname, "refresh") == 0)
240 : {
241 66 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
242 0 : errorConflictingDefElem(defel, pstate);
243 :
244 66 : opts->specified_opts |= SUBOPT_REFRESH;
245 66 : opts->refresh = defGetBoolean(defel);
246 : }
247 288 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
248 264 : strcmp(defel->defname, "binary") == 0)
249 : {
250 32 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
251 0 : errorConflictingDefElem(defel, pstate);
252 :
253 32 : opts->specified_opts |= SUBOPT_BINARY;
254 32 : opts->binary = defGetBoolean(defel);
255 : }
256 256 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
257 232 : strcmp(defel->defname, "streaming") == 0)
258 : {
259 74 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
260 0 : errorConflictingDefElem(defel, pstate);
261 :
262 74 : opts->specified_opts |= SUBOPT_STREAMING;
263 74 : opts->streaming = defGetStreamingMode(defel);
264 : }
265 182 : else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
266 158 : strcmp(defel->defname, "two_phase") == 0)
267 : {
268 36 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
269 0 : errorConflictingDefElem(defel, pstate);
270 :
271 36 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
272 36 : opts->twophase = defGetBoolean(defel);
273 : }
274 146 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
275 122 : strcmp(defel->defname, "disable_on_error") == 0)
276 : {
277 20 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
278 0 : errorConflictingDefElem(defel, pstate);
279 :
280 20 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
281 20 : opts->disableonerr = defGetBoolean(defel);
282 : }
283 126 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
284 102 : strcmp(defel->defname, "password_required") == 0)
285 : {
286 24 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
287 0 : errorConflictingDefElem(defel, pstate);
288 :
289 24 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
290 24 : opts->passwordrequired = defGetBoolean(defel);
291 : }
292 102 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
293 78 : strcmp(defel->defname, "run_as_owner") == 0)
294 : {
295 18 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
296 0 : errorConflictingDefElem(defel, pstate);
297 :
298 18 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
299 18 : opts->runasowner = defGetBoolean(defel);
300 : }
301 84 : else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
302 60 : strcmp(defel->defname, "failover") == 0)
303 : {
304 24 : if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
305 0 : errorConflictingDefElem(defel, pstate);
306 :
307 24 : opts->specified_opts |= SUBOPT_FAILOVER;
308 24 : opts->failover = defGetBoolean(defel);
309 : }
310 60 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
311 36 : strcmp(defel->defname, "origin") == 0)
312 : {
313 30 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
314 0 : errorConflictingDefElem(defel, pstate);
315 :
316 30 : opts->specified_opts |= SUBOPT_ORIGIN;
317 30 : pfree(opts->origin);
318 :
319 : /*
320 : * Even though the "origin" parameter allows only "none" and "any"
321 : * values, it is implemented as a string type so that the
322 : * parameter can be extended in future versions to support
323 : * filtering using origin names specified by the user.
324 : */
325 30 : opts->origin = defGetString(defel);
326 :
327 44 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
328 14 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
329 6 : ereport(ERROR,
330 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
331 : errmsg("unrecognized origin value: \"%s\"", opts->origin));
332 : }
333 30 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
334 24 : strcmp(defel->defname, "lsn") == 0)
335 18 : {
336 24 : char *lsn_str = defGetString(defel);
337 : XLogRecPtr lsn;
338 :
339 24 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
340 0 : errorConflictingDefElem(defel, pstate);
341 :
342 : /* Setting lsn = NONE is treated as resetting LSN */
343 24 : if (strcmp(lsn_str, "none") == 0)
344 6 : lsn = InvalidXLogRecPtr;
345 : else
346 : {
347 : /* Parse the argument as LSN */
348 18 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
349 : CStringGetDatum(lsn_str)));
350 :
351 18 : if (XLogRecPtrIsInvalid(lsn))
352 6 : ereport(ERROR,
353 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
354 : errmsg("invalid WAL location (LSN): %s", lsn_str)));
355 : }
356 :
357 18 : opts->specified_opts |= SUBOPT_LSN;
358 18 : opts->lsn = lsn;
359 : }
360 : else
361 6 : ereport(ERROR,
362 : (errcode(ERRCODE_SYNTAX_ERROR),
363 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
364 : }
365 :
366 : /*
367 : * We've been explicitly asked to not connect, that requires some
368 : * additional processing.
369 : */
370 800 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
371 : {
372 : /* Check for incompatible options from the user. */
373 134 : if (opts->enabled &&
374 134 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
375 6 : ereport(ERROR,
376 : (errcode(ERRCODE_SYNTAX_ERROR),
377 : /*- translator: both %s are strings of the form "option = value" */
378 : errmsg("%s and %s are mutually exclusive options",
379 : "connect = false", "enabled = true")));
380 :
381 128 : if (opts->create_slot &&
382 122 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
383 6 : ereport(ERROR,
384 : (errcode(ERRCODE_SYNTAX_ERROR),
385 : errmsg("%s and %s are mutually exclusive options",
386 : "connect = false", "create_slot = true")));
387 :
388 122 : if (opts->copy_data &&
389 116 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
390 6 : ereport(ERROR,
391 : (errcode(ERRCODE_SYNTAX_ERROR),
392 : errmsg("%s and %s are mutually exclusive options",
393 : "connect = false", "copy_data = true")));
394 :
395 : /* Change the defaults of other options. */
396 116 : opts->enabled = false;
397 116 : opts->create_slot = false;
398 116 : opts->copy_data = false;
399 : }
400 :
401 : /*
402 : * Do additional checking for disallowed combination when slot_name = NONE
403 : * was used.
404 : */
405 782 : if (!opts->slot_name &&
406 754 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
407 : {
408 104 : if (opts->enabled)
409 : {
410 18 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
411 6 : ereport(ERROR,
412 : (errcode(ERRCODE_SYNTAX_ERROR),
413 : /*- translator: both %s are strings of the form "option = value" */
414 : errmsg("%s and %s are mutually exclusive options",
415 : "slot_name = NONE", "enabled = true")));
416 : else
417 12 : ereport(ERROR,
418 : (errcode(ERRCODE_SYNTAX_ERROR),
419 : /*- translator: both %s are strings of the form "option = value" */
420 : errmsg("subscription with %s must also set %s",
421 : "slot_name = NONE", "enabled = false")));
422 : }
423 :
424 86 : if (opts->create_slot)
425 : {
426 12 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
427 6 : ereport(ERROR,
428 : (errcode(ERRCODE_SYNTAX_ERROR),
429 : /*- translator: both %s are strings of the form "option = value" */
430 : errmsg("%s and %s are mutually exclusive options",
431 : "slot_name = NONE", "create_slot = true")));
432 : else
433 6 : ereport(ERROR,
434 : (errcode(ERRCODE_SYNTAX_ERROR),
435 : /*- translator: both %s are strings of the form "option = value" */
436 : errmsg("subscription with %s must also set %s",
437 : "slot_name = NONE", "create_slot = false")));
438 : }
439 : }
440 752 : }
441 :
442 : /*
443 : * Check that the specified publications are present on the publisher.
444 : */
445 : static void
446 232 : check_publications(WalReceiverConn *wrconn, List *publications)
447 : {
448 : WalRcvExecResult *res;
449 : StringInfo cmd;
450 : TupleTableSlot *slot;
451 232 : List *publicationsCopy = NIL;
452 232 : Oid tableRow[1] = {TEXTOID};
453 :
454 232 : cmd = makeStringInfo();
455 232 : appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
456 : " pg_catalog.pg_publication t WHERE\n"
457 : " t.pubname IN (");
458 232 : GetPublicationsStr(publications, cmd, true);
459 232 : appendStringInfoChar(cmd, ')');
460 :
461 232 : res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
462 232 : destroyStringInfo(cmd);
463 :
464 232 : if (res->status != WALRCV_OK_TUPLES)
465 0 : ereport(ERROR,
466 : errmsg("could not receive list of publications from the publisher: %s",
467 : res->err));
468 :
469 232 : publicationsCopy = list_copy(publications);
470 :
471 : /* Process publication(s). */
472 232 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
473 516 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
474 : {
475 : char *pubname;
476 : bool isnull;
477 :
478 284 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
479 : Assert(!isnull);
480 :
481 : /* Delete the publication present in publisher from the list. */
482 284 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
483 284 : ExecClearTuple(slot);
484 : }
485 :
486 232 : ExecDropSingleTupleTableSlot(slot);
487 :
488 232 : walrcv_clear_result(res);
489 :
490 232 : if (list_length(publicationsCopy))
491 : {
492 : /* Prepare the list of non-existent publication(s) for error message. */
493 6 : StringInfo pubnames = makeStringInfo();
494 :
495 6 : GetPublicationsStr(publicationsCopy, pubnames, false);
496 6 : ereport(WARNING,
497 : errcode(ERRCODE_UNDEFINED_OBJECT),
498 : errmsg_plural("publication %s does not exist on the publisher",
499 : "publications %s do not exist on the publisher",
500 : list_length(publicationsCopy),
501 : pubnames->data));
502 : }
503 232 : }
504 :
505 : /*
506 : * Auxiliary function to build a text array out of a list of String nodes.
507 : */
508 : static Datum
509 356 : publicationListToArray(List *publist)
510 : {
511 : ArrayType *arr;
512 : Datum *datums;
513 : MemoryContext memcxt;
514 : MemoryContext oldcxt;
515 :
516 : /* Create memory context for temporary allocations. */
517 356 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
518 : "publicationListToArray to array",
519 : ALLOCSET_DEFAULT_SIZES);
520 356 : oldcxt = MemoryContextSwitchTo(memcxt);
521 :
522 356 : datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
523 :
524 356 : check_duplicates_in_publist(publist, datums);
525 :
526 350 : MemoryContextSwitchTo(oldcxt);
527 :
528 350 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
529 :
530 350 : MemoryContextDelete(memcxt);
531 :
532 350 : return PointerGetDatum(arr);
533 : }
534 :
535 : /*
536 : * Create new subscription.
537 : */
538 : ObjectAddress
539 436 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
540 : bool isTopLevel)
541 : {
542 : Relation rel;
543 : ObjectAddress myself;
544 : Oid subid;
545 : bool nulls[Natts_pg_subscription];
546 : Datum values[Natts_pg_subscription];
547 436 : Oid owner = GetUserId();
548 : HeapTuple tup;
549 : char *conninfo;
550 : char originname[NAMEDATALEN];
551 : List *publications;
552 : bits32 supported_opts;
553 436 : SubOpts opts = {0};
554 : AclResult aclresult;
555 :
556 : /*
557 : * Parse and check options.
558 : *
559 : * Connection and publication should not be specified here.
560 : */
561 436 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
562 : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
563 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
564 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
565 : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
566 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
567 436 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
568 :
569 : /*
570 : * Since creating a replication slot is not transactional, rolling back
571 : * the transaction leaves the created replication slot. So we cannot run
572 : * CREATE SUBSCRIPTION inside a transaction block if creating a
573 : * replication slot.
574 : */
575 358 : if (opts.create_slot)
576 232 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
577 :
578 : /*
579 : * We don't want to allow unprivileged users to be able to trigger
580 : * attempts to access arbitrary network destinations, so require the user
581 : * to have been specifically authorized to create subscriptions.
582 : */
583 352 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
584 6 : ereport(ERROR,
585 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
586 : errmsg("permission denied to create subscription"),
587 : errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
588 : "pg_create_subscription")));
589 :
590 : /*
591 : * Since a subscription is a database object, we also check for CREATE
592 : * permission on the database.
593 : */
594 346 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
595 : owner, ACL_CREATE);
596 346 : if (aclresult != ACLCHECK_OK)
597 12 : aclcheck_error(aclresult, OBJECT_DATABASE,
598 6 : get_database_name(MyDatabaseId));
599 :
600 : /*
601 : * Non-superusers are required to set a password for authentication, and
602 : * that password must be used by the target server, but the superuser can
603 : * exempt a subscription from this requirement.
604 : */
605 340 : if (!opts.passwordrequired && !superuser_arg(owner))
606 6 : ereport(ERROR,
607 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
608 : errmsg("password_required=false is superuser-only"),
609 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
610 :
611 : /*
612 : * If built with appropriate switch, whine when regression-testing
613 : * conventions for subscription names are violated.
614 : */
615 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
616 : if (strncmp(stmt->subname, "regress_", 8) != 0)
617 : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
618 : #endif
619 :
620 334 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
621 :
622 : /* Check if name is used */
623 334 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
624 : MyDatabaseId, CStringGetDatum(stmt->subname));
625 334 : if (OidIsValid(subid))
626 : {
627 6 : ereport(ERROR,
628 : (errcode(ERRCODE_DUPLICATE_OBJECT),
629 : errmsg("subscription \"%s\" already exists",
630 : stmt->subname)));
631 : }
632 :
633 328 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
634 286 : opts.slot_name == NULL)
635 286 : opts.slot_name = stmt->subname;
636 :
637 : /* The default for synchronous_commit of subscriptions is off. */
638 328 : if (opts.synchronous_commit == NULL)
639 328 : opts.synchronous_commit = "off";
640 :
641 328 : conninfo = stmt->conninfo;
642 328 : publications = stmt->publication;
643 :
644 : /* Load the library providing us libpq calls. */
645 328 : load_file("libpqwalreceiver", false);
646 :
647 : /* Check the connection info string. */
648 328 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
649 :
650 : /* Everything ok, form a new tuple. */
651 310 : memset(values, 0, sizeof(values));
652 310 : memset(nulls, false, sizeof(nulls));
653 :
654 310 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
655 : Anum_pg_subscription_oid);
656 310 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
657 310 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
658 310 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
659 310 : values[Anum_pg_subscription_subname - 1] =
660 310 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
661 310 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
662 310 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
663 310 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
664 310 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
665 310 : values[Anum_pg_subscription_subtwophasestate - 1] =
666 310 : CharGetDatum(opts.twophase ?
667 : LOGICALREP_TWOPHASE_STATE_PENDING :
668 : LOGICALREP_TWOPHASE_STATE_DISABLED);
669 310 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
670 310 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
671 310 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
672 310 : values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
673 310 : values[Anum_pg_subscription_subconninfo - 1] =
674 310 : CStringGetTextDatum(conninfo);
675 310 : if (opts.slot_name)
676 290 : values[Anum_pg_subscription_subslotname - 1] =
677 290 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
678 : else
679 20 : nulls[Anum_pg_subscription_subslotname - 1] = true;
680 310 : values[Anum_pg_subscription_subsynccommit - 1] =
681 310 : CStringGetTextDatum(opts.synchronous_commit);
682 304 : values[Anum_pg_subscription_subpublications - 1] =
683 310 : publicationListToArray(publications);
684 304 : values[Anum_pg_subscription_suborigin - 1] =
685 304 : CStringGetTextDatum(opts.origin);
686 :
687 304 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
688 :
689 : /* Insert tuple into catalog. */
690 304 : CatalogTupleInsert(rel, tup);
691 304 : heap_freetuple(tup);
692 :
693 304 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
694 :
695 304 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
696 304 : replorigin_create(originname);
697 :
698 : /*
699 : * Connect to remote side to execute requested commands and fetch table
700 : * info.
701 : */
702 304 : if (opts.connect)
703 : {
704 : char *err;
705 : WalReceiverConn *wrconn;
706 : List *tables;
707 : ListCell *lc;
708 : char table_state;
709 : bool must_use_password;
710 :
711 : /* Try to connect to the publisher. */
712 224 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
713 224 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
714 : stmt->subname, &err);
715 224 : if (!wrconn)
716 6 : ereport(ERROR,
717 : (errcode(ERRCODE_CONNECTION_FAILURE),
718 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
719 : stmt->subname, err)));
720 :
721 218 : PG_TRY();
722 : {
723 218 : check_publications(wrconn, publications);
724 218 : check_publications_origin(wrconn, publications, opts.copy_data,
725 : opts.origin, NULL, 0, stmt->subname);
726 :
727 : /*
728 : * Set sync state based on if we were asked to do data copy or
729 : * not.
730 : */
731 218 : table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
732 :
733 : /*
734 : * Get the table list from publisher and build local table status
735 : * info.
736 : */
737 218 : tables = fetch_table_list(wrconn, publications);
738 554 : foreach(lc, tables)
739 : {
740 338 : RangeVar *rv = (RangeVar *) lfirst(lc);
741 : Oid relid;
742 :
743 338 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
744 :
745 : /* Check for supported relkind. */
746 338 : CheckSubscriptionRelkind(get_rel_relkind(relid),
747 338 : rv->schemaname, rv->relname);
748 :
749 338 : AddSubscriptionRelState(subid, relid, table_state,
750 : InvalidXLogRecPtr, true);
751 : }
752 :
753 : /*
754 : * If requested, create permanent slot for the subscription. We
755 : * won't use the initial snapshot for anything, so no need to
756 : * export it.
757 : */
758 216 : if (opts.create_slot)
759 : {
760 206 : bool twophase_enabled = false;
761 :
762 : Assert(opts.slot_name);
763 :
764 : /*
765 : * Even if two_phase is set, don't create the slot with
766 : * two-phase enabled. Will enable it once all the tables are
767 : * synced and ready. This avoids race-conditions like prepared
768 : * transactions being skipped due to changes not being applied
769 : * due to checks in should_apply_changes_for_rel() when
770 : * tablesync for the corresponding tables are in progress. See
771 : * comments atop worker.c.
772 : *
773 : * Note that if tables were specified but copy_data is false
774 : * then it is safe to enable two_phase up-front because those
775 : * tables are already initially in READY state. When the
776 : * subscription has no tables, we leave the twophase state as
777 : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
778 : * PUBLICATION to work.
779 : */
780 206 : if (opts.twophase && !opts.copy_data && tables != NIL)
781 2 : twophase_enabled = true;
782 :
783 206 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
784 : opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
785 :
786 206 : if (twophase_enabled)
787 2 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
788 :
789 206 : ereport(NOTICE,
790 : (errmsg("created replication slot \"%s\" on publisher",
791 : opts.slot_name)));
792 : }
793 : }
794 2 : PG_FINALLY();
795 : {
796 218 : walrcv_disconnect(wrconn);
797 : }
798 218 : PG_END_TRY();
799 : }
800 : else
801 80 : ereport(WARNING,
802 : (errmsg("subscription was created, but is not connected"),
803 : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
804 :
805 296 : table_close(rel, RowExclusiveLock);
806 :
807 296 : pgstat_create_subscription(subid);
808 :
809 296 : if (opts.enabled)
810 204 : ApplyLauncherWakeupAtCommit();
811 :
812 296 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
813 :
814 296 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
815 :
816 296 : return myself;
817 : }
818 :
819 : static void
820 58 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
821 : List *validate_publications)
822 : {
823 : char *err;
824 : List *pubrel_names;
825 : List *subrel_states;
826 : Oid *subrel_local_oids;
827 : Oid *pubrel_local_oids;
828 : ListCell *lc;
829 : int off;
830 : int remove_rel_len;
831 : int subrel_count;
832 58 : Relation rel = NULL;
833 : typedef struct SubRemoveRels
834 : {
835 : Oid relid;
836 : char state;
837 : } SubRemoveRels;
838 : SubRemoveRels *sub_remove_rels;
839 : WalReceiverConn *wrconn;
840 : bool must_use_password;
841 :
842 : /* Load the library providing us libpq calls. */
843 58 : load_file("libpqwalreceiver", false);
844 :
845 : /* Try to connect to the publisher. */
846 58 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
847 58 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
848 : sub->name, &err);
849 56 : if (!wrconn)
850 0 : ereport(ERROR,
851 : (errcode(ERRCODE_CONNECTION_FAILURE),
852 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
853 : sub->name, err)));
854 :
855 56 : PG_TRY();
856 : {
857 56 : if (validate_publications)
858 14 : check_publications(wrconn, validate_publications);
859 :
860 : /* Get the table list from publisher. */
861 56 : pubrel_names = fetch_table_list(wrconn, sub->publications);
862 :
863 : /* Get local table list. */
864 56 : subrel_states = GetSubscriptionRelations(sub->oid, false);
865 56 : subrel_count = list_length(subrel_states);
866 :
867 : /*
868 : * Build qsorted array of local table oids for faster lookup. This can
869 : * potentially contain all tables in the database so speed of lookup
870 : * is important.
871 : */
872 56 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
873 56 : off = 0;
874 218 : foreach(lc, subrel_states)
875 : {
876 162 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
877 :
878 162 : subrel_local_oids[off++] = relstate->relid;
879 : }
880 56 : qsort(subrel_local_oids, subrel_count,
881 : sizeof(Oid), oid_cmp);
882 :
883 56 : check_publications_origin(wrconn, sub->publications, copy_data,
884 : sub->origin, subrel_local_oids,
885 : subrel_count, sub->name);
886 :
887 : /*
888 : * Rels that we want to remove from subscription and drop any slots
889 : * and origins corresponding to them.
890 : */
891 56 : sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
892 :
893 : /*
894 : * Walk over the remote tables and try to match them to locally known
895 : * tables. If the table is not known locally create a new state for
896 : * it.
897 : *
898 : * Also builds array of local oids of remote tables for the next step.
899 : */
900 56 : off = 0;
901 56 : pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
902 :
903 220 : foreach(lc, pubrel_names)
904 : {
905 164 : RangeVar *rv = (RangeVar *) lfirst(lc);
906 : Oid relid;
907 :
908 164 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
909 :
910 : /* Check for supported relkind. */
911 164 : CheckSubscriptionRelkind(get_rel_relkind(relid),
912 164 : rv->schemaname, rv->relname);
913 :
914 164 : pubrel_local_oids[off++] = relid;
915 :
916 164 : if (!bsearch(&relid, subrel_local_oids,
917 : subrel_count, sizeof(Oid), oid_cmp))
918 : {
919 38 : AddSubscriptionRelState(sub->oid, relid,
920 : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
921 : InvalidXLogRecPtr, true);
922 38 : ereport(DEBUG1,
923 : (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
924 : rv->schemaname, rv->relname, sub->name)));
925 : }
926 : }
927 :
928 : /*
929 : * Next remove state for tables we should not care about anymore using
930 : * the data we collected above
931 : */
932 56 : qsort(pubrel_local_oids, list_length(pubrel_names),
933 : sizeof(Oid), oid_cmp);
934 :
935 56 : remove_rel_len = 0;
936 218 : for (off = 0; off < subrel_count; off++)
937 : {
938 162 : Oid relid = subrel_local_oids[off];
939 :
940 162 : if (!bsearch(&relid, pubrel_local_oids,
941 162 : list_length(pubrel_names), sizeof(Oid), oid_cmp))
942 : {
943 : char state;
944 : XLogRecPtr statelsn;
945 :
946 : /*
947 : * Lock pg_subscription_rel with AccessExclusiveLock to
948 : * prevent any race conditions with the apply worker
949 : * re-launching workers at the same time this code is trying
950 : * to remove those tables.
951 : *
952 : * Even if new worker for this particular rel is restarted it
953 : * won't be able to make any progress as we hold exclusive
954 : * lock on pg_subscription_rel till the transaction end. It
955 : * will simply exit as there is no corresponding rel entry.
956 : *
957 : * This locking also ensures that the state of rels won't
958 : * change till we are done with this refresh operation.
959 : */
960 36 : if (!rel)
961 14 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
962 :
963 : /* Last known rel state. */
964 36 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
965 :
966 36 : sub_remove_rels[remove_rel_len].relid = relid;
967 36 : sub_remove_rels[remove_rel_len++].state = state;
968 :
969 36 : RemoveSubscriptionRel(sub->oid, relid);
970 :
971 36 : logicalrep_worker_stop(sub->oid, relid);
972 :
973 : /*
974 : * For READY state, we would have already dropped the
975 : * tablesync origin.
976 : */
977 36 : if (state != SUBREL_STATE_READY)
978 : {
979 : char originname[NAMEDATALEN];
980 :
981 : /*
982 : * Drop the tablesync's origin tracking if exists.
983 : *
984 : * It is possible that the origin is not yet created for
985 : * tablesync worker, this can happen for the states before
986 : * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
987 : * apply worker can also concurrently try to drop the
988 : * origin and by this time the origin might be already
989 : * removed. For these reasons, passing missing_ok = true.
990 : */
991 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
992 : sizeof(originname));
993 0 : replorigin_drop_by_name(originname, true, false);
994 : }
995 :
996 36 : ereport(DEBUG1,
997 : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
998 : get_namespace_name(get_rel_namespace(relid)),
999 : get_rel_name(relid),
1000 : sub->name)));
1001 : }
1002 : }
1003 :
1004 : /*
1005 : * Drop the tablesync slots associated with removed tables. This has
1006 : * to be at the end because otherwise if there is an error while doing
1007 : * the database operations we won't be able to rollback dropped slots.
1008 : */
1009 92 : for (off = 0; off < remove_rel_len; off++)
1010 : {
1011 36 : if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
1012 0 : sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
1013 : {
1014 0 : char syncslotname[NAMEDATALEN] = {0};
1015 :
1016 : /*
1017 : * For READY/SYNCDONE states we know the tablesync slot has
1018 : * already been dropped by the tablesync worker.
1019 : *
1020 : * For other states, there is no certainty, maybe the slot
1021 : * does not exist yet. Also, if we fail after removing some of
1022 : * the slots, next time, it will again try to drop already
1023 : * dropped slots and fail. For these reasons, we allow
1024 : * missing_ok = true for the drop.
1025 : */
1026 0 : ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
1027 : syncslotname, sizeof(syncslotname));
1028 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1029 : }
1030 : }
1031 : }
1032 0 : PG_FINALLY();
1033 : {
1034 56 : walrcv_disconnect(wrconn);
1035 : }
1036 56 : PG_END_TRY();
1037 :
1038 56 : if (rel)
1039 14 : table_close(rel, NoLock);
1040 56 : }
1041 :
1042 : /*
1043 : * Common checks for altering failover and two_phase options.
1044 : */
1045 : static void
1046 18 : CheckAlterSubOption(Subscription *sub, const char *option,
1047 : bool slot_needs_update, bool isTopLevel)
1048 : {
1049 : /*
1050 : * The checks in this function are required only for failover and
1051 : * two_phase options.
1052 : */
1053 : Assert(strcmp(option, "failover") == 0 ||
1054 : strcmp(option, "two_phase") == 0);
1055 :
1056 : /*
1057 : * Do not allow changing the option if the subscription is enabled. This
1058 : * is because both failover and two_phase options of the slot on the
1059 : * publisher cannot be modified if the slot is currently acquired by the
1060 : * existing walsender.
1061 : *
1062 : * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1063 : * the publisher by the existing walsender, so we could have allowed that
1064 : * even when the subscription is enabled. But we kept this restriction for
1065 : * the sake of consistency and simplicity.
1066 : */
1067 18 : if (sub->enabled)
1068 2 : ereport(ERROR,
1069 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1070 : errmsg("cannot set option \"%s\" for enabled subscription",
1071 : option)));
1072 :
1073 16 : if (slot_needs_update)
1074 : {
1075 : StringInfoData cmd;
1076 :
1077 : /*
1078 : * A valid slot must be associated with the subscription for us to
1079 : * modify any of the slot's properties.
1080 : */
1081 14 : if (!sub->slotname)
1082 0 : ereport(ERROR,
1083 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1084 : errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1085 : option)));
1086 :
1087 : /* The changed option of the slot can't be rolled back. */
1088 14 : initStringInfo(&cmd);
1089 14 : appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1090 :
1091 14 : PreventInTransactionBlock(isTopLevel, cmd.data);
1092 8 : pfree(cmd.data);
1093 : }
1094 10 : }
1095 :
1096 : /*
1097 : * Alter the existing subscription.
1098 : */
1099 : ObjectAddress
1100 450 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1101 : bool isTopLevel)
1102 : {
1103 : Relation rel;
1104 : ObjectAddress myself;
1105 : bool nulls[Natts_pg_subscription];
1106 : bool replaces[Natts_pg_subscription];
1107 : Datum values[Natts_pg_subscription];
1108 : HeapTuple tup;
1109 : Oid subid;
1110 450 : bool update_tuple = false;
1111 450 : bool update_failover = false;
1112 450 : bool update_two_phase = false;
1113 : Subscription *sub;
1114 : Form_pg_subscription form;
1115 : bits32 supported_opts;
1116 450 : SubOpts opts = {0};
1117 :
1118 450 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1119 :
1120 : /* Fetch the existing tuple. */
1121 450 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1122 : CStringGetDatum(stmt->subname));
1123 :
1124 450 : if (!HeapTupleIsValid(tup))
1125 6 : ereport(ERROR,
1126 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1127 : errmsg("subscription \"%s\" does not exist",
1128 : stmt->subname)));
1129 :
1130 444 : form = (Form_pg_subscription) GETSTRUCT(tup);
1131 444 : subid = form->oid;
1132 :
1133 : /* must be owner */
1134 444 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1135 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1136 0 : stmt->subname);
1137 :
1138 444 : sub = GetSubscription(subid, false);
1139 :
1140 : /*
1141 : * Don't allow non-superuser modification of a subscription with
1142 : * password_required=false.
1143 : */
1144 444 : if (!sub->passwordrequired && !superuser())
1145 0 : ereport(ERROR,
1146 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1147 : errmsg("password_required=false is superuser-only"),
1148 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1149 :
1150 : /* Lock the subscription so nobody else can do anything with it. */
1151 444 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1152 :
1153 : /* Form a new tuple. */
1154 444 : memset(values, 0, sizeof(values));
1155 444 : memset(nulls, false, sizeof(nulls));
1156 444 : memset(replaces, false, sizeof(replaces));
1157 :
1158 444 : switch (stmt->kind)
1159 : {
1160 184 : case ALTER_SUBSCRIPTION_OPTIONS:
1161 : {
1162 184 : supported_opts = (SUBOPT_SLOT_NAME |
1163 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1164 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
1165 : SUBOPT_DISABLE_ON_ERR |
1166 : SUBOPT_PASSWORD_REQUIRED |
1167 : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1168 : SUBOPT_ORIGIN);
1169 :
1170 184 : parse_subscription_options(pstate, stmt->options,
1171 : supported_opts, &opts);
1172 :
1173 166 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1174 : {
1175 : /*
1176 : * The subscription must be disabled to allow slot_name as
1177 : * 'none', otherwise, the apply worker will repeatedly try
1178 : * to stream the data using that slot_name which neither
1179 : * exists on the publisher nor the user will be allowed to
1180 : * create it.
1181 : */
1182 60 : if (sub->enabled && !opts.slot_name)
1183 0 : ereport(ERROR,
1184 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1185 : errmsg("cannot set %s for enabled subscription",
1186 : "slot_name = NONE")));
1187 :
1188 60 : if (opts.slot_name)
1189 6 : values[Anum_pg_subscription_subslotname - 1] =
1190 6 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1191 : else
1192 54 : nulls[Anum_pg_subscription_subslotname - 1] = true;
1193 60 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1194 : }
1195 :
1196 166 : if (opts.synchronous_commit)
1197 : {
1198 6 : values[Anum_pg_subscription_subsynccommit - 1] =
1199 6 : CStringGetTextDatum(opts.synchronous_commit);
1200 6 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1201 : }
1202 :
1203 166 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1204 : {
1205 18 : values[Anum_pg_subscription_subbinary - 1] =
1206 18 : BoolGetDatum(opts.binary);
1207 18 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1208 : }
1209 :
1210 166 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1211 : {
1212 30 : values[Anum_pg_subscription_substream - 1] =
1213 30 : CharGetDatum(opts.streaming);
1214 30 : replaces[Anum_pg_subscription_substream - 1] = true;
1215 : }
1216 :
1217 166 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1218 : {
1219 : values[Anum_pg_subscription_subdisableonerr - 1]
1220 6 : = BoolGetDatum(opts.disableonerr);
1221 : replaces[Anum_pg_subscription_subdisableonerr - 1]
1222 6 : = true;
1223 : }
1224 :
1225 166 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1226 : {
1227 : /* Non-superuser may not disable password_required. */
1228 12 : if (!opts.passwordrequired && !superuser())
1229 0 : ereport(ERROR,
1230 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1231 : errmsg("password_required=false is superuser-only"),
1232 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1233 :
1234 : values[Anum_pg_subscription_subpasswordrequired - 1]
1235 12 : = BoolGetDatum(opts.passwordrequired);
1236 : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1237 12 : = true;
1238 : }
1239 :
1240 166 : if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1241 : {
1242 14 : values[Anum_pg_subscription_subrunasowner - 1] =
1243 14 : BoolGetDatum(opts.runasowner);
1244 14 : replaces[Anum_pg_subscription_subrunasowner - 1] = true;
1245 : }
1246 :
1247 166 : if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1248 : {
1249 : /*
1250 : * We need to update both the slot and the subscription
1251 : * for the two_phase option. We can enable the two_phase
1252 : * option for a slot only once the initial data
1253 : * synchronization is done. This is to avoid missing some
1254 : * data as explained in comments atop worker.c.
1255 : */
1256 4 : update_two_phase = !opts.twophase;
1257 :
1258 4 : CheckAlterSubOption(sub, "two_phase", update_two_phase,
1259 : isTopLevel);
1260 :
1261 : /*
1262 : * Modifying the two_phase slot option requires a slot
1263 : * lookup by slot name, so changing the slot name at the
1264 : * same time is not allowed.
1265 : */
1266 4 : if (update_two_phase &&
1267 2 : IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1268 0 : ereport(ERROR,
1269 : (errcode(ERRCODE_SYNTAX_ERROR),
1270 : errmsg("slot_name and two_phase cannot be altered at the same time")));
1271 :
1272 : /*
1273 : * Note that workers may still survive even if the
1274 : * subscription has been disabled.
1275 : *
1276 : * Ensure workers have already been exited to avoid
1277 : * getting prepared transactions while we are disabling
1278 : * the two_phase option. Otherwise, the changes of an
1279 : * already prepared transaction can be replicated again
1280 : * along with its corresponding commit, leading to
1281 : * duplicate data or errors.
1282 : */
1283 4 : if (logicalrep_workers_find(subid, true, true))
1284 0 : ereport(ERROR,
1285 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1286 : errmsg("cannot alter two_phase when logical replication worker is still running"),
1287 : errhint("Try again after some time.")));
1288 :
1289 : /*
1290 : * two_phase cannot be disabled if there are any
1291 : * uncommitted prepared transactions present otherwise it
1292 : * can lead to duplicate data or errors as explained in
1293 : * the comment above.
1294 : */
1295 4 : if (update_two_phase &&
1296 2 : sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
1297 2 : LookupGXactBySubid(subid))
1298 0 : ereport(ERROR,
1299 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1300 : errmsg("cannot disable two_phase when prepared transactions are present"),
1301 : errhint("Resolve these transactions and try again.")));
1302 :
1303 : /* Change system catalog accordingly */
1304 4 : values[Anum_pg_subscription_subtwophasestate - 1] =
1305 4 : CharGetDatum(opts.twophase ?
1306 : LOGICALREP_TWOPHASE_STATE_PENDING :
1307 : LOGICALREP_TWOPHASE_STATE_DISABLED);
1308 4 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1309 : }
1310 :
1311 166 : if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1312 : {
1313 : /*
1314 : * Similar to the two_phase case above, we need to update
1315 : * the failover option for both the slot and the
1316 : * subscription.
1317 : */
1318 14 : update_failover = true;
1319 :
1320 14 : CheckAlterSubOption(sub, "failover", update_failover,
1321 : isTopLevel);
1322 :
1323 6 : values[Anum_pg_subscription_subfailover - 1] =
1324 6 : BoolGetDatum(opts.failover);
1325 6 : replaces[Anum_pg_subscription_subfailover - 1] = true;
1326 : }
1327 :
1328 158 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1329 : {
1330 6 : values[Anum_pg_subscription_suborigin - 1] =
1331 6 : CStringGetTextDatum(opts.origin);
1332 6 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1333 : }
1334 :
1335 158 : update_tuple = true;
1336 158 : break;
1337 : }
1338 :
1339 80 : case ALTER_SUBSCRIPTION_ENABLED:
1340 : {
1341 80 : parse_subscription_options(pstate, stmt->options,
1342 : SUBOPT_ENABLED, &opts);
1343 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1344 :
1345 80 : if (!sub->slotname && opts.enabled)
1346 6 : ereport(ERROR,
1347 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1348 : errmsg("cannot enable subscription that does not have a slot name")));
1349 :
1350 74 : values[Anum_pg_subscription_subenabled - 1] =
1351 74 : BoolGetDatum(opts.enabled);
1352 74 : replaces[Anum_pg_subscription_subenabled - 1] = true;
1353 :
1354 74 : if (opts.enabled)
1355 42 : ApplyLauncherWakeupAtCommit();
1356 :
1357 74 : update_tuple = true;
1358 74 : break;
1359 : }
1360 :
1361 20 : case ALTER_SUBSCRIPTION_CONNECTION:
1362 : /* Load the library providing us libpq calls. */
1363 20 : load_file("libpqwalreceiver", false);
1364 : /* Check the connection info string. */
1365 20 : walrcv_check_conninfo(stmt->conninfo,
1366 : sub->passwordrequired && !sub->ownersuperuser);
1367 :
1368 14 : values[Anum_pg_subscription_subconninfo - 1] =
1369 14 : CStringGetTextDatum(stmt->conninfo);
1370 14 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1371 14 : update_tuple = true;
1372 14 : break;
1373 :
1374 28 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
1375 : {
1376 28 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1377 28 : parse_subscription_options(pstate, stmt->options,
1378 : supported_opts, &opts);
1379 :
1380 28 : values[Anum_pg_subscription_subpublications - 1] =
1381 28 : publicationListToArray(stmt->publication);
1382 28 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1383 :
1384 28 : update_tuple = true;
1385 :
1386 : /* Refresh if user asked us to. */
1387 28 : if (opts.refresh)
1388 : {
1389 22 : if (!sub->enabled)
1390 0 : ereport(ERROR,
1391 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1392 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1393 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1394 :
1395 : /*
1396 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1397 : * not allowed.
1398 : */
1399 22 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1400 0 : ereport(ERROR,
1401 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1402 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1403 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1404 :
1405 22 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1406 :
1407 : /* Make sure refresh sees the new list of publications. */
1408 10 : sub->publications = stmt->publication;
1409 :
1410 10 : AlterSubscription_refresh(sub, opts.copy_data,
1411 : stmt->publication);
1412 : }
1413 :
1414 16 : break;
1415 : }
1416 :
1417 54 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
1418 : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1419 : {
1420 : List *publist;
1421 54 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1422 :
1423 54 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1424 54 : parse_subscription_options(pstate, stmt->options,
1425 : supported_opts, &opts);
1426 :
1427 54 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1428 18 : values[Anum_pg_subscription_subpublications - 1] =
1429 18 : publicationListToArray(publist);
1430 18 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1431 :
1432 18 : update_tuple = true;
1433 :
1434 : /* Refresh if user asked us to. */
1435 18 : if (opts.refresh)
1436 : {
1437 : /* We only need to validate user specified publications. */
1438 6 : List *validate_publications = (isadd) ? stmt->publication : NULL;
1439 :
1440 6 : if (!sub->enabled)
1441 0 : ereport(ERROR,
1442 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1443 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1444 : /* translator: %s is an SQL ALTER command */
1445 : errhint("Use %s instead.",
1446 : isadd ?
1447 : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1448 : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1449 :
1450 : /*
1451 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1452 : * not allowed.
1453 : */
1454 6 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1455 0 : ereport(ERROR,
1456 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1457 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1458 : /* translator: %s is an SQL ALTER command */
1459 : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1460 : isadd ?
1461 : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1462 : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1463 :
1464 6 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1465 :
1466 : /* Refresh the new list of publications. */
1467 6 : sub->publications = publist;
1468 :
1469 6 : AlterSubscription_refresh(sub, opts.copy_data,
1470 : validate_publications);
1471 : }
1472 :
1473 18 : break;
1474 : }
1475 :
1476 54 : case ALTER_SUBSCRIPTION_REFRESH:
1477 : {
1478 54 : if (!sub->enabled)
1479 6 : ereport(ERROR,
1480 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1481 : errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1482 :
1483 48 : parse_subscription_options(pstate, stmt->options,
1484 : SUBOPT_COPY_DATA, &opts);
1485 :
1486 : /*
1487 : * The subscription option "two_phase" requires that
1488 : * replication has passed the initial table synchronization
1489 : * phase before the two_phase becomes properly enabled.
1490 : *
1491 : * But, having reached this two-phase commit "enabled" state
1492 : * we must not allow any subsequent table initialization to
1493 : * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
1494 : * when the user had requested two_phase = on mode.
1495 : *
1496 : * The exception to this restriction is when copy_data =
1497 : * false, because when copy_data is false the tablesync will
1498 : * start already in READY state and will exit directly without
1499 : * doing anything.
1500 : *
1501 : * For more details see comments atop worker.c.
1502 : */
1503 48 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1504 0 : ereport(ERROR,
1505 : (errcode(ERRCODE_SYNTAX_ERROR),
1506 : errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1507 : errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1508 :
1509 48 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
1510 :
1511 42 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
1512 :
1513 40 : break;
1514 : }
1515 :
1516 24 : case ALTER_SUBSCRIPTION_SKIP:
1517 : {
1518 24 : parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1519 :
1520 : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1521 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1522 :
1523 : /*
1524 : * If the user sets subskiplsn, we do a sanity check to make
1525 : * sure that the specified LSN is a probable value.
1526 : */
1527 18 : if (!XLogRecPtrIsInvalid(opts.lsn))
1528 : {
1529 : RepOriginId originid;
1530 : char originname[NAMEDATALEN];
1531 : XLogRecPtr remote_lsn;
1532 :
1533 12 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
1534 : originname, sizeof(originname));
1535 12 : originid = replorigin_by_name(originname, false);
1536 12 : remote_lsn = replorigin_get_progress(originid, false);
1537 :
1538 : /* Check the given LSN is at least a future LSN */
1539 12 : if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1540 0 : ereport(ERROR,
1541 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1542 : errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1543 : LSN_FORMAT_ARGS(opts.lsn),
1544 : LSN_FORMAT_ARGS(remote_lsn))));
1545 : }
1546 :
1547 18 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1548 18 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1549 :
1550 18 : update_tuple = true;
1551 18 : break;
1552 : }
1553 :
1554 0 : default:
1555 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1556 : stmt->kind);
1557 : }
1558 :
1559 : /* Update the catalog if needed. */
1560 338 : if (update_tuple)
1561 : {
1562 298 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1563 : replaces);
1564 :
1565 298 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1566 :
1567 298 : heap_freetuple(tup);
1568 : }
1569 :
1570 : /*
1571 : * Try to acquire the connection necessary for altering the slot, if
1572 : * needed.
1573 : *
1574 : * This has to be at the end because otherwise if there is an error while
1575 : * doing the database operations we won't be able to rollback altered
1576 : * slot.
1577 : */
1578 338 : if (update_failover || update_two_phase)
1579 : {
1580 : bool must_use_password;
1581 : char *err;
1582 : WalReceiverConn *wrconn;
1583 :
1584 : /* Load the library providing us libpq calls. */
1585 8 : load_file("libpqwalreceiver", false);
1586 :
1587 : /* Try to connect to the publisher. */
1588 8 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1589 8 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1590 : sub->name, &err);
1591 8 : if (!wrconn)
1592 0 : ereport(ERROR,
1593 : (errcode(ERRCODE_CONNECTION_FAILURE),
1594 : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1595 : sub->name, err)));
1596 :
1597 8 : PG_TRY();
1598 : {
1599 8 : walrcv_alter_slot(wrconn, sub->slotname,
1600 : update_failover ? &opts.failover : NULL,
1601 : update_two_phase ? &opts.twophase : NULL);
1602 : }
1603 0 : PG_FINALLY();
1604 : {
1605 8 : walrcv_disconnect(wrconn);
1606 : }
1607 8 : PG_END_TRY();
1608 : }
1609 :
1610 338 : table_close(rel, RowExclusiveLock);
1611 :
1612 338 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1613 :
1614 338 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1615 :
1616 : /* Wake up related replication workers to handle this change quickly. */
1617 338 : LogicalRepWorkersWakeupAtCommit(subid);
1618 :
1619 338 : return myself;
1620 : }
1621 :
1622 : /*
1623 : * Drop a subscription
1624 : */
1625 : void
1626 214 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
1627 : {
1628 : Relation rel;
1629 : ObjectAddress myself;
1630 : HeapTuple tup;
1631 : Oid subid;
1632 : Oid subowner;
1633 : Datum datum;
1634 : bool isnull;
1635 : char *subname;
1636 : char *conninfo;
1637 : char *slotname;
1638 : List *subworkers;
1639 : ListCell *lc;
1640 : char originname[NAMEDATALEN];
1641 214 : char *err = NULL;
1642 : WalReceiverConn *wrconn;
1643 : Form_pg_subscription form;
1644 : List *rstates;
1645 : bool must_use_password;
1646 :
1647 : /*
1648 : * Lock pg_subscription with AccessExclusiveLock to ensure that the
1649 : * launcher doesn't restart new worker during dropping the subscription
1650 : */
1651 214 : rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
1652 :
1653 214 : tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
1654 214 : CStringGetDatum(stmt->subname));
1655 :
1656 214 : if (!HeapTupleIsValid(tup))
1657 : {
1658 12 : table_close(rel, NoLock);
1659 :
1660 12 : if (!stmt->missing_ok)
1661 6 : ereport(ERROR,
1662 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1663 : errmsg("subscription \"%s\" does not exist",
1664 : stmt->subname)));
1665 : else
1666 6 : ereport(NOTICE,
1667 : (errmsg("subscription \"%s\" does not exist, skipping",
1668 : stmt->subname)));
1669 :
1670 80 : return;
1671 : }
1672 :
1673 202 : form = (Form_pg_subscription) GETSTRUCT(tup);
1674 202 : subid = form->oid;
1675 202 : subowner = form->subowner;
1676 202 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
1677 :
1678 : /* must be owner */
1679 202 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1680 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1681 0 : stmt->subname);
1682 :
1683 : /* DROP hook for the subscription being removed */
1684 202 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
1685 :
1686 : /*
1687 : * Lock the subscription so nobody else can do anything with it (including
1688 : * the replication workers).
1689 : */
1690 202 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1691 :
1692 : /* Get subname */
1693 202 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1694 : Anum_pg_subscription_subname);
1695 202 : subname = pstrdup(NameStr(*DatumGetName(datum)));
1696 :
1697 : /* Get conninfo */
1698 202 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1699 : Anum_pg_subscription_subconninfo);
1700 202 : conninfo = TextDatumGetCString(datum);
1701 :
1702 : /* Get slotname */
1703 202 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1704 : Anum_pg_subscription_subslotname, &isnull);
1705 202 : if (!isnull)
1706 128 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
1707 : else
1708 74 : slotname = NULL;
1709 :
1710 : /*
1711 : * Since dropping a replication slot is not transactional, the replication
1712 : * slot stays dropped even if the transaction rolls back. So we cannot
1713 : * run DROP SUBSCRIPTION inside a transaction block if dropping the
1714 : * replication slot. Also, in this case, we report a message for dropping
1715 : * the subscription to the cumulative stats system.
1716 : *
1717 : * XXX The command name should really be something like "DROP SUBSCRIPTION
1718 : * of a subscription that is associated with a replication slot", but we
1719 : * don't have the proper facilities for that.
1720 : */
1721 202 : if (slotname)
1722 128 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1723 :
1724 196 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1725 196 : EventTriggerSQLDropAddObject(&myself, true, true);
1726 :
1727 : /* Remove the tuple from catalog. */
1728 196 : CatalogTupleDelete(rel, &tup->t_self);
1729 :
1730 196 : ReleaseSysCache(tup);
1731 :
1732 : /*
1733 : * Stop all the subscription workers immediately.
1734 : *
1735 : * This is necessary if we are dropping the replication slot, so that the
1736 : * slot becomes accessible.
1737 : *
1738 : * It is also necessary if the subscription is disabled and was disabled
1739 : * in the same transaction. Then the workers haven't seen the disabling
1740 : * yet and will still be running, leading to hangs later when we want to
1741 : * drop the replication origin. If the subscription was disabled before
1742 : * this transaction, then there shouldn't be any workers left, so this
1743 : * won't make a difference.
1744 : *
1745 : * New workers won't be started because we hold an exclusive lock on the
1746 : * subscription till the end of the transaction.
1747 : */
1748 196 : subworkers = logicalrep_workers_find(subid, false, true);
1749 324 : foreach(lc, subworkers)
1750 : {
1751 128 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
1752 :
1753 128 : logicalrep_worker_stop(w->subid, w->relid);
1754 : }
1755 196 : list_free(subworkers);
1756 :
1757 : /*
1758 : * Remove the no-longer-useful entry in the launcher's table of apply
1759 : * worker start times.
1760 : *
1761 : * If this transaction rolls back, the launcher might restart a failed
1762 : * apply worker before wal_retrieve_retry_interval milliseconds have
1763 : * elapsed, but that's pretty harmless.
1764 : */
1765 196 : ApplyLauncherForgetWorkerStartTime(subid);
1766 :
1767 : /*
1768 : * Cleanup of tablesync replication origins.
1769 : *
1770 : * Any READY-state relations would already have dealt with clean-ups.
1771 : *
1772 : * Note that the state can't change because we have already stopped both
1773 : * the apply and tablesync workers and they can't restart because of
1774 : * exclusive lock on the subscription.
1775 : */
1776 196 : rstates = GetSubscriptionRelations(subid, true);
1777 202 : foreach(lc, rstates)
1778 : {
1779 6 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
1780 6 : Oid relid = rstate->relid;
1781 :
1782 : /* Only cleanup resources of tablesync workers */
1783 6 : if (!OidIsValid(relid))
1784 0 : continue;
1785 :
1786 : /*
1787 : * Drop the tablesync's origin tracking if exists.
1788 : *
1789 : * It is possible that the origin is not yet created for tablesync
1790 : * worker so passing missing_ok = true. This can happen for the states
1791 : * before SUBREL_STATE_FINISHEDCOPY.
1792 : */
1793 6 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
1794 : sizeof(originname));
1795 6 : replorigin_drop_by_name(originname, true, false);
1796 : }
1797 :
1798 : /* Clean up dependencies */
1799 196 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
1800 :
1801 : /* Remove any associated relation synchronization states. */
1802 196 : RemoveSubscriptionRel(subid, InvalidOid);
1803 :
1804 : /* Remove the origin tracking if exists. */
1805 196 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
1806 196 : replorigin_drop_by_name(originname, true, false);
1807 :
1808 : /*
1809 : * Tell the cumulative stats system that the subscription is getting
1810 : * dropped.
1811 : */
1812 196 : pgstat_drop_subscription(subid);
1813 :
1814 : /*
1815 : * If there is no slot associated with the subscription, we can finish
1816 : * here.
1817 : */
1818 196 : if (!slotname && rstates == NIL)
1819 : {
1820 74 : table_close(rel, NoLock);
1821 74 : return;
1822 : }
1823 :
1824 : /*
1825 : * Try to acquire the connection necessary for dropping slots.
1826 : *
1827 : * Note: If the slotname is NONE/NULL then we allow the command to finish
1828 : * and users need to manually cleanup the apply and tablesync worker slots
1829 : * later.
1830 : *
1831 : * This has to be at the end because otherwise if there is an error while
1832 : * doing the database operations we won't be able to rollback dropped
1833 : * slot.
1834 : */
1835 122 : load_file("libpqwalreceiver", false);
1836 :
1837 122 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
1838 : subname, &err);
1839 122 : if (wrconn == NULL)
1840 : {
1841 0 : if (!slotname)
1842 : {
1843 : /* be tidy */
1844 0 : list_free(rstates);
1845 0 : table_close(rel, NoLock);
1846 0 : return;
1847 : }
1848 : else
1849 : {
1850 0 : ReportSlotConnectionError(rstates, subid, slotname, err);
1851 : }
1852 : }
1853 :
1854 122 : PG_TRY();
1855 : {
1856 128 : foreach(lc, rstates)
1857 : {
1858 6 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
1859 6 : Oid relid = rstate->relid;
1860 :
1861 : /* Only cleanup resources of tablesync workers */
1862 6 : if (!OidIsValid(relid))
1863 0 : continue;
1864 :
1865 : /*
1866 : * Drop the tablesync slots associated with removed tables.
1867 : *
1868 : * For SYNCDONE/READY states, the tablesync slot is known to have
1869 : * already been dropped by the tablesync worker.
1870 : *
1871 : * For other states, there is no certainty, maybe the slot does
1872 : * not exist yet. Also, if we fail after removing some of the
1873 : * slots, next time, it will again try to drop already dropped
1874 : * slots and fail. For these reasons, we allow missing_ok = true
1875 : * for the drop.
1876 : */
1877 6 : if (rstate->state != SUBREL_STATE_SYNCDONE)
1878 : {
1879 6 : char syncslotname[NAMEDATALEN] = {0};
1880 :
1881 6 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
1882 : sizeof(syncslotname));
1883 6 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1884 : }
1885 : }
1886 :
1887 122 : list_free(rstates);
1888 :
1889 : /*
1890 : * If there is a slot associated with the subscription, then drop the
1891 : * replication slot at the publisher.
1892 : */
1893 122 : if (slotname)
1894 122 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
1895 : }
1896 0 : PG_FINALLY();
1897 : {
1898 122 : walrcv_disconnect(wrconn);
1899 : }
1900 122 : PG_END_TRY();
1901 :
1902 122 : table_close(rel, NoLock);
1903 : }
1904 :
1905 : /*
1906 : * Drop the replication slot at the publisher node using the replication
1907 : * connection.
1908 : *
1909 : * missing_ok - if true then only issue a LOG message if the slot doesn't
1910 : * exist.
1911 : */
1912 : void
1913 490 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
1914 : {
1915 : StringInfoData cmd;
1916 :
1917 : Assert(wrconn);
1918 :
1919 490 : load_file("libpqwalreceiver", false);
1920 :
1921 490 : initStringInfo(&cmd);
1922 490 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
1923 :
1924 490 : PG_TRY();
1925 : {
1926 : WalRcvExecResult *res;
1927 :
1928 490 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
1929 :
1930 490 : if (res->status == WALRCV_OK_COMMAND)
1931 : {
1932 : /* NOTICE. Success. */
1933 490 : ereport(NOTICE,
1934 : (errmsg("dropped replication slot \"%s\" on publisher",
1935 : slotname)));
1936 : }
1937 0 : else if (res->status == WALRCV_ERROR &&
1938 0 : missing_ok &&
1939 0 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
1940 : {
1941 : /* LOG. Error, but missing_ok = true. */
1942 0 : ereport(LOG,
1943 : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
1944 : slotname, res->err)));
1945 : }
1946 : else
1947 : {
1948 : /* ERROR. */
1949 0 : ereport(ERROR,
1950 : (errcode(ERRCODE_CONNECTION_FAILURE),
1951 : errmsg("could not drop replication slot \"%s\" on publisher: %s",
1952 : slotname, res->err)));
1953 : }
1954 :
1955 490 : walrcv_clear_result(res);
1956 : }
1957 0 : PG_FINALLY();
1958 : {
1959 490 : pfree(cmd.data);
1960 : }
1961 490 : PG_END_TRY();
1962 490 : }
1963 :
1964 : /*
1965 : * Internal workhorse for changing a subscription owner
1966 : */
1967 : static void
1968 18 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1969 : {
1970 : Form_pg_subscription form;
1971 : AclResult aclresult;
1972 :
1973 18 : form = (Form_pg_subscription) GETSTRUCT(tup);
1974 :
1975 18 : if (form->subowner == newOwnerId)
1976 4 : return;
1977 :
1978 14 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
1979 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1980 0 : NameStr(form->subname));
1981 :
1982 : /*
1983 : * Don't allow non-superuser modification of a subscription with
1984 : * password_required=false.
1985 : */
1986 14 : if (!form->subpasswordrequired && !superuser())
1987 0 : ereport(ERROR,
1988 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1989 : errmsg("password_required=false is superuser-only"),
1990 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1991 :
1992 : /* Must be able to become new owner */
1993 14 : check_can_set_role(GetUserId(), newOwnerId);
1994 :
1995 : /*
1996 : * current owner must have CREATE on database
1997 : *
1998 : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
1999 : * other object types behave differently (e.g. you can't give a table to a
2000 : * user who lacks CREATE privileges on a schema).
2001 : */
2002 8 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
2003 : GetUserId(), ACL_CREATE);
2004 8 : if (aclresult != ACLCHECK_OK)
2005 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2006 0 : get_database_name(MyDatabaseId));
2007 :
2008 8 : form->subowner = newOwnerId;
2009 8 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2010 :
2011 : /* Update owner dependency reference */
2012 8 : changeDependencyOnOwner(SubscriptionRelationId,
2013 : form->oid,
2014 : newOwnerId);
2015 :
2016 8 : InvokeObjectPostAlterHook(SubscriptionRelationId,
2017 : form->oid, 0);
2018 :
2019 : /* Wake up related background processes to handle this change quickly. */
2020 8 : ApplyLauncherWakeupAtCommit();
2021 8 : LogicalRepWorkersWakeupAtCommit(form->oid);
2022 : }
2023 :
2024 : /*
2025 : * Change subscription owner -- by name
2026 : */
2027 : ObjectAddress
2028 18 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
2029 : {
2030 : Oid subid;
2031 : HeapTuple tup;
2032 : Relation rel;
2033 : ObjectAddress address;
2034 : Form_pg_subscription form;
2035 :
2036 18 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2037 :
2038 18 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
2039 : CStringGetDatum(name));
2040 :
2041 18 : if (!HeapTupleIsValid(tup))
2042 0 : ereport(ERROR,
2043 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2044 : errmsg("subscription \"%s\" does not exist", name)));
2045 :
2046 18 : form = (Form_pg_subscription) GETSTRUCT(tup);
2047 18 : subid = form->oid;
2048 :
2049 18 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2050 :
2051 12 : ObjectAddressSet(address, SubscriptionRelationId, subid);
2052 :
2053 12 : heap_freetuple(tup);
2054 :
2055 12 : table_close(rel, RowExclusiveLock);
2056 :
2057 12 : return address;
2058 : }
2059 :
2060 : /*
2061 : * Change subscription owner -- by OID
2062 : */
2063 : void
2064 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
2065 : {
2066 : HeapTuple tup;
2067 : Relation rel;
2068 :
2069 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2070 :
2071 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
2072 :
2073 0 : if (!HeapTupleIsValid(tup))
2074 0 : ereport(ERROR,
2075 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2076 : errmsg("subscription with OID %u does not exist", subid)));
2077 :
2078 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2079 :
2080 0 : heap_freetuple(tup);
2081 :
2082 0 : table_close(rel, RowExclusiveLock);
2083 0 : }
2084 :
2085 : /*
2086 : * Check and log a warning if the publisher has subscribed to the same table
2087 : * from some other publisher. This check is required only if "copy_data = true"
2088 : * and "origin = none" for CREATE SUBSCRIPTION and
2089 : * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
2090 : * having origin might have been copied.
2091 : *
2092 : * This check need not be performed on the tables that are already added
2093 : * because incremental sync for those tables will happen through WAL and the
2094 : * origin of the data can be identified from the WAL records.
2095 : *
2096 : * subrel_local_oids contains the list of relation oids that are already
2097 : * present on the subscriber.
2098 : */
2099 : static void
2100 274 : check_publications_origin(WalReceiverConn *wrconn, List *publications,
2101 : bool copydata, char *origin, Oid *subrel_local_oids,
2102 : int subrel_count, char *subname)
2103 : {
2104 : WalRcvExecResult *res;
2105 : StringInfoData cmd;
2106 : TupleTableSlot *slot;
2107 274 : Oid tableRow[1] = {TEXTOID};
2108 274 : List *publist = NIL;
2109 : int i;
2110 :
2111 526 : if (!copydata || !origin ||
2112 252 : (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
2113 262 : return;
2114 :
2115 12 : initStringInfo(&cmd);
2116 12 : appendStringInfoString(&cmd,
2117 : "SELECT DISTINCT P.pubname AS pubname\n"
2118 : "FROM pg_publication P,\n"
2119 : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2120 : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
2121 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2122 : "WHERE C.oid = GPT.relid AND P.pubname IN (");
2123 12 : GetPublicationsStr(publications, &cmd, true);
2124 12 : appendStringInfoString(&cmd, ")\n");
2125 :
2126 : /*
2127 : * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
2128 : * the list of relation oids that are already present on the subscriber.
2129 : * This check should be skipped for these tables.
2130 : */
2131 18 : for (i = 0; i < subrel_count; i++)
2132 : {
2133 6 : Oid relid = subrel_local_oids[i];
2134 6 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
2135 6 : char *tablename = get_rel_name(relid);
2136 :
2137 6 : appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2138 : schemaname, tablename);
2139 : }
2140 :
2141 12 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2142 12 : pfree(cmd.data);
2143 :
2144 12 : if (res->status != WALRCV_OK_TUPLES)
2145 0 : ereport(ERROR,
2146 : (errcode(ERRCODE_CONNECTION_FAILURE),
2147 : errmsg("could not receive list of replicated tables from the publisher: %s",
2148 : res->err)));
2149 :
2150 : /* Process tables. */
2151 12 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2152 16 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2153 : {
2154 : char *pubname;
2155 : bool isnull;
2156 :
2157 4 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2158 : Assert(!isnull);
2159 :
2160 4 : ExecClearTuple(slot);
2161 4 : publist = list_append_unique(publist, makeString(pubname));
2162 : }
2163 :
2164 : /*
2165 : * Log a warning if the publisher has subscribed to the same table from
2166 : * some other publisher. We cannot know the origin of data during the
2167 : * initial sync. Data origins can be found only from the WAL by looking at
2168 : * the origin id.
2169 : *
2170 : * XXX: For simplicity, we don't check whether the table has any data or
2171 : * not. If the table doesn't have any data then we don't need to
2172 : * distinguish between data having origin and data not having origin so we
2173 : * can avoid logging a warning in that case.
2174 : */
2175 12 : if (publist)
2176 : {
2177 4 : StringInfo pubnames = makeStringInfo();
2178 :
2179 : /* Prepare the list of publication(s) for warning message. */
2180 4 : GetPublicationsStr(publist, pubnames, false);
2181 4 : ereport(WARNING,
2182 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2183 : errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2184 : subname),
2185 : errdetail_plural("The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2186 : "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2187 : list_length(publist), pubnames->data),
2188 : errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2189 : }
2190 :
2191 12 : ExecDropSingleTupleTableSlot(slot);
2192 :
2193 12 : walrcv_clear_result(res);
2194 : }
2195 :
2196 : /*
2197 : * Get the list of tables which belong to specified publications on the
2198 : * publisher connection.
2199 : *
2200 : * Note that we don't support the case where the column list is different for
2201 : * the same table in different publications to avoid sending unwanted column
2202 : * information for some of the rows. This can happen when both the column
2203 : * list and row filter are specified for different publications.
2204 : */
2205 : static List *
2206 274 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
2207 : {
2208 : WalRcvExecResult *res;
2209 : StringInfoData cmd;
2210 : TupleTableSlot *slot;
2211 274 : Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
2212 274 : List *tablelist = NIL;
2213 274 : int server_version = walrcv_server_version(wrconn);
2214 274 : bool check_columnlist = (server_version >= 150000);
2215 274 : StringInfo pub_names = makeStringInfo();
2216 :
2217 274 : initStringInfo(&cmd);
2218 :
2219 : /* Build the pub_names comma-separated string. */
2220 274 : GetPublicationsStr(publications, pub_names, true);
2221 :
2222 : /* Get the list of tables from the publisher. */
2223 274 : if (server_version >= 160000)
2224 : {
2225 274 : tableRow[2] = INT2VECTOROID;
2226 :
2227 : /*
2228 : * From version 16, we allowed passing multiple publications to the
2229 : * function pg_get_publication_tables. This helped to filter out the
2230 : * partition table whose ancestor is also published in this
2231 : * publication array.
2232 : *
2233 : * Join pg_get_publication_tables with pg_publication to exclude
2234 : * non-existing publications.
2235 : *
2236 : * Note that attrs are always stored in sorted order so we don't need
2237 : * to worry if different publications have specified them in a
2238 : * different order. See pub_collist_validate.
2239 : */
2240 274 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
2241 : " FROM pg_class c\n"
2242 : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2243 : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2244 : " FROM pg_publication\n"
2245 : " WHERE pubname IN ( %s )) AS gpt\n"
2246 : " ON gpt.relid = c.oid\n",
2247 : pub_names->data);
2248 : }
2249 : else
2250 : {
2251 0 : tableRow[2] = NAMEARRAYOID;
2252 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
2253 :
2254 : /* Get column lists for each relation if the publisher supports it */
2255 0 : if (check_columnlist)
2256 0 : appendStringInfoString(&cmd, ", t.attnames\n");
2257 :
2258 0 : appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2259 : " WHERE t.pubname IN ( %s )",
2260 : pub_names->data);
2261 : }
2262 :
2263 274 : destroyStringInfo(pub_names);
2264 :
2265 274 : res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
2266 274 : pfree(cmd.data);
2267 :
2268 274 : if (res->status != WALRCV_OK_TUPLES)
2269 0 : ereport(ERROR,
2270 : (errcode(ERRCODE_CONNECTION_FAILURE),
2271 : errmsg("could not receive list of replicated tables from the publisher: %s",
2272 : res->err)));
2273 :
2274 : /* Process tables. */
2275 274 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2276 778 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2277 : {
2278 : char *nspname;
2279 : char *relname;
2280 : bool isnull;
2281 : RangeVar *rv;
2282 :
2283 506 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2284 : Assert(!isnull);
2285 506 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2286 : Assert(!isnull);
2287 :
2288 506 : rv = makeRangeVar(nspname, relname, -1);
2289 :
2290 506 : if (check_columnlist && list_member(tablelist, rv))
2291 2 : ereport(ERROR,
2292 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2293 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2294 : nspname, relname));
2295 : else
2296 504 : tablelist = lappend(tablelist, rv);
2297 :
2298 504 : ExecClearTuple(slot);
2299 : }
2300 272 : ExecDropSingleTupleTableSlot(slot);
2301 :
2302 272 : walrcv_clear_result(res);
2303 :
2304 272 : return tablelist;
2305 : }
2306 :
2307 : /*
2308 : * This is to report the connection failure while dropping replication slots.
2309 : * Here, we report the WARNING for all tablesync slots so that user can drop
2310 : * them manually, if required.
2311 : */
2312 : static void
2313 0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
2314 : {
2315 : ListCell *lc;
2316 :
2317 0 : foreach(lc, rstates)
2318 : {
2319 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2320 0 : Oid relid = rstate->relid;
2321 :
2322 : /* Only cleanup resources of tablesync workers */
2323 0 : if (!OidIsValid(relid))
2324 0 : continue;
2325 :
2326 : /*
2327 : * Caller needs to ensure that relstate doesn't change underneath us.
2328 : * See DropSubscription where we get the relstates.
2329 : */
2330 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2331 : {
2332 0 : char syncslotname[NAMEDATALEN] = {0};
2333 :
2334 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2335 : sizeof(syncslotname));
2336 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
2337 : syncslotname);
2338 : }
2339 : }
2340 :
2341 0 : ereport(ERROR,
2342 : (errcode(ERRCODE_CONNECTION_FAILURE),
2343 : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2344 : slotname, err),
2345 : /* translator: %s is an SQL ALTER command */
2346 : errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
2347 : "ALTER SUBSCRIPTION ... DISABLE",
2348 : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2349 : }
2350 :
2351 : /*
2352 : * Check for duplicates in the given list of publications and error out if
2353 : * found one. Add publications to datums as text datums, if datums is not
2354 : * NULL.
2355 : */
2356 : static void
2357 410 : check_duplicates_in_publist(List *publist, Datum *datums)
2358 : {
2359 : ListCell *cell;
2360 410 : int j = 0;
2361 :
2362 942 : foreach(cell, publist)
2363 : {
2364 550 : char *name = strVal(lfirst(cell));
2365 : ListCell *pcell;
2366 :
2367 822 : foreach(pcell, publist)
2368 : {
2369 822 : char *pname = strVal(lfirst(pcell));
2370 :
2371 822 : if (pcell == cell)
2372 532 : break;
2373 :
2374 290 : if (strcmp(name, pname) == 0)
2375 18 : ereport(ERROR,
2376 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2377 : errmsg("publication name \"%s\" used more than once",
2378 : pname)));
2379 : }
2380 :
2381 532 : if (datums)
2382 446 : datums[j++] = CStringGetTextDatum(name);
2383 : }
2384 392 : }
2385 :
2386 : /*
2387 : * Merge current subscription's publications and user-specified publications
2388 : * from ADD/DROP PUBLICATIONS.
2389 : *
2390 : * If addpub is true, we will add the list of publications into oldpublist.
2391 : * Otherwise, we will delete the list of publications from oldpublist. The
2392 : * returned list is a copy, oldpublist itself is not changed.
2393 : *
2394 : * subname is the subscription name, for error messages.
2395 : */
2396 : static List *
2397 54 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
2398 : {
2399 : ListCell *lc;
2400 :
2401 54 : oldpublist = list_copy(oldpublist);
2402 :
2403 54 : check_duplicates_in_publist(newpublist, NULL);
2404 :
2405 92 : foreach(lc, newpublist)
2406 : {
2407 68 : char *name = strVal(lfirst(lc));
2408 : ListCell *lc2;
2409 68 : bool found = false;
2410 :
2411 134 : foreach(lc2, oldpublist)
2412 : {
2413 110 : char *pubname = strVal(lfirst(lc2));
2414 :
2415 110 : if (strcmp(name, pubname) == 0)
2416 : {
2417 44 : found = true;
2418 44 : if (addpub)
2419 12 : ereport(ERROR,
2420 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2421 : errmsg("publication \"%s\" is already in subscription \"%s\"",
2422 : name, subname)));
2423 : else
2424 32 : oldpublist = foreach_delete_current(oldpublist, lc2);
2425 :
2426 32 : break;
2427 : }
2428 : }
2429 :
2430 56 : if (addpub && !found)
2431 18 : oldpublist = lappend(oldpublist, makeString(name));
2432 38 : else if (!addpub && !found)
2433 6 : ereport(ERROR,
2434 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2435 : errmsg("publication \"%s\" is not in subscription \"%s\"",
2436 : name, subname)));
2437 : }
2438 :
2439 : /*
2440 : * XXX Probably no strong reason for this, but for now it's to make ALTER
2441 : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
2442 : */
2443 24 : if (!oldpublist)
2444 6 : ereport(ERROR,
2445 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2446 : errmsg("cannot drop all the publications from a subscription")));
2447 :
2448 18 : return oldpublist;
2449 : }
2450 :
2451 : /*
2452 : * Extract the streaming mode value from a DefElem. This is like
2453 : * defGetBoolean() but also accepts the special value of "parallel".
2454 : */
2455 : char
2456 744 : defGetStreamingMode(DefElem *def)
2457 : {
2458 : /*
2459 : * If no parameter value given, assume "true" is meant.
2460 : */
2461 744 : if (!def->arg)
2462 0 : return LOGICALREP_STREAM_ON;
2463 :
2464 : /*
2465 : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
2466 : */
2467 744 : switch (nodeTag(def->arg))
2468 : {
2469 0 : case T_Integer:
2470 0 : switch (intVal(def->arg))
2471 : {
2472 0 : case 0:
2473 0 : return LOGICALREP_STREAM_OFF;
2474 0 : case 1:
2475 0 : return LOGICALREP_STREAM_ON;
2476 0 : default:
2477 : /* otherwise, error out below */
2478 0 : break;
2479 : }
2480 0 : break;
2481 744 : default:
2482 : {
2483 744 : char *sval = defGetString(def);
2484 :
2485 : /*
2486 : * The set of strings accepted here should match up with the
2487 : * grammar's opt_boolean_or_string production.
2488 : */
2489 1482 : if (pg_strcasecmp(sval, "false") == 0 ||
2490 738 : pg_strcasecmp(sval, "off") == 0)
2491 12 : return LOGICALREP_STREAM_OFF;
2492 1446 : if (pg_strcasecmp(sval, "true") == 0 ||
2493 714 : pg_strcasecmp(sval, "on") == 0)
2494 92 : return LOGICALREP_STREAM_ON;
2495 640 : if (pg_strcasecmp(sval, "parallel") == 0)
2496 634 : return LOGICALREP_STREAM_PARALLEL;
2497 : }
2498 6 : break;
2499 : }
2500 :
2501 6 : ereport(ERROR,
2502 : (errcode(ERRCODE_SYNTAX_ERROR),
2503 : errmsg("%s requires a Boolean value or \"parallel\"",
2504 : def->defname)));
2505 : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
2506 : }
|