Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * subscriptioncmds.c
4 : * subscription catalog manipulation functions
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/subscriptioncmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/dependency.h"
22 : #include "catalog/indexing.h"
23 : #include "catalog/namespace.h"
24 : #include "catalog/objectaccess.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/pg_authid_d.h"
27 : #include "catalog/pg_database_d.h"
28 : #include "catalog/pg_subscription.h"
29 : #include "catalog/pg_subscription_rel.h"
30 : #include "catalog/pg_type.h"
31 : #include "commands/dbcommands.h"
32 : #include "commands/defrem.h"
33 : #include "commands/event_trigger.h"
34 : #include "commands/subscriptioncmds.h"
35 : #include "executor/executor.h"
36 : #include "miscadmin.h"
37 : #include "nodes/makefuncs.h"
38 : #include "pgstat.h"
39 : #include "replication/logicallauncher.h"
40 : #include "replication/logicalworker.h"
41 : #include "replication/origin.h"
42 : #include "replication/slot.h"
43 : #include "replication/walreceiver.h"
44 : #include "replication/walsender.h"
45 : #include "replication/worker_internal.h"
46 : #include "storage/lmgr.h"
47 : #include "utils/acl.h"
48 : #include "utils/builtins.h"
49 : #include "utils/guc.h"
50 : #include "utils/lsyscache.h"
51 : #include "utils/memutils.h"
52 : #include "utils/pg_lsn.h"
53 : #include "utils/syscache.h"
54 :
55 : /*
56 : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
57 : * command.
58 : */
59 : #define SUBOPT_CONNECT 0x00000001
60 : #define SUBOPT_ENABLED 0x00000002
61 : #define SUBOPT_CREATE_SLOT 0x00000004
62 : #define SUBOPT_SLOT_NAME 0x00000008
63 : #define SUBOPT_COPY_DATA 0x00000010
64 : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
65 : #define SUBOPT_REFRESH 0x00000040
66 : #define SUBOPT_BINARY 0x00000080
67 : #define SUBOPT_STREAMING 0x00000100
68 : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
69 : #define SUBOPT_DISABLE_ON_ERR 0x00000400
70 : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
71 : #define SUBOPT_RUN_AS_OWNER 0x00001000
72 : #define SUBOPT_LSN 0x00002000
73 : #define SUBOPT_ORIGIN 0x00004000
74 :
75 : /* check if the 'val' has 'bits' set */
76 : #define IsSet(val, bits) (((val) & (bits)) == (bits))
77 :
78 : /*
79 : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
80 : * SUBSCRIPTION command options and the parsed/default values of each of them.
81 : */
82 : typedef struct SubOpts
83 : {
84 : bits32 specified_opts;
85 : char *slot_name;
86 : char *synchronous_commit;
87 : bool connect;
88 : bool enabled;
89 : bool create_slot;
90 : bool copy_data;
91 : bool refresh;
92 : bool binary;
93 : char streaming;
94 : bool twophase;
95 : bool disableonerr;
96 : bool passwordrequired;
97 : bool runasowner;
98 : char *origin;
99 : XLogRecPtr lsn;
100 : } SubOpts;
101 :
102 : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
103 : static void check_publications_origin(WalReceiverConn *wrconn,
104 : List *publications, bool copydata,
105 : char *origin, Oid *subrel_local_oids,
106 : int subrel_count, char *subname);
107 : static void check_duplicates_in_publist(List *publist, Datum *datums);
108 : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
109 : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
110 :
111 :
112 : /*
113 : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
114 : *
115 : * Since not all options can be specified in both commands, this function
116 : * will report an error if mutually exclusive options are specified.
117 : */
118 : static void
119 726 : parse_subscription_options(ParseState *pstate, List *stmt_options,
120 : bits32 supported_opts, SubOpts *opts)
121 : {
122 : ListCell *lc;
123 :
124 : /* Start out with cleared opts. */
125 726 : memset(opts, 0, sizeof(SubOpts));
126 :
127 : /* caller must expect some option */
128 : Assert(supported_opts != 0);
129 :
130 : /* If connect option is supported, these others also need to be. */
131 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
132 : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
133 : SUBOPT_COPY_DATA));
134 :
135 : /* Set default values for the supported options. */
136 726 : if (IsSet(supported_opts, SUBOPT_CONNECT))
137 356 : opts->connect = true;
138 726 : if (IsSet(supported_opts, SUBOPT_ENABLED))
139 398 : opts->enabled = true;
140 726 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
141 356 : opts->create_slot = true;
142 726 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
143 504 : opts->copy_data = true;
144 726 : if (IsSet(supported_opts, SUBOPT_REFRESH))
145 102 : opts->refresh = true;
146 726 : if (IsSet(supported_opts, SUBOPT_BINARY))
147 512 : opts->binary = false;
148 726 : if (IsSet(supported_opts, SUBOPT_STREAMING))
149 512 : opts->streaming = LOGICALREP_STREAM_OFF;
150 726 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
151 356 : opts->twophase = false;
152 726 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
153 512 : opts->disableonerr = false;
154 726 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
155 512 : opts->passwordrequired = true;
156 726 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
157 512 : opts->runasowner = false;
158 726 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
159 512 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
160 :
161 : /* Parse options */
162 1390 : foreach(lc, stmt_options)
163 : {
164 724 : DefElem *defel = (DefElem *) lfirst(lc);
165 :
166 724 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
167 428 : strcmp(defel->defname, "connect") == 0)
168 : {
169 158 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
170 0 : errorConflictingDefElem(defel, pstate);
171 :
172 158 : opts->specified_opts |= SUBOPT_CONNECT;
173 158 : opts->connect = defGetBoolean(defel);
174 : }
175 566 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
176 312 : strcmp(defel->defname, "enabled") == 0)
177 : {
178 66 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
179 0 : errorConflictingDefElem(defel, pstate);
180 :
181 66 : opts->specified_opts |= SUBOPT_ENABLED;
182 66 : opts->enabled = defGetBoolean(defel);
183 : }
184 500 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
185 246 : strcmp(defel->defname, "create_slot") == 0)
186 : {
187 32 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
188 0 : errorConflictingDefElem(defel, pstate);
189 :
190 32 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
191 32 : opts->create_slot = defGetBoolean(defel);
192 : }
193 468 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
194 372 : strcmp(defel->defname, "slot_name") == 0)
195 : {
196 124 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
197 0 : errorConflictingDefElem(defel, pstate);
198 :
199 124 : opts->specified_opts |= SUBOPT_SLOT_NAME;
200 124 : opts->slot_name = defGetString(defel);
201 :
202 : /* Setting slot_name = NONE is treated as no slot name. */
203 124 : if (strcmp(opts->slot_name, "none") == 0)
204 106 : opts->slot_name = NULL;
205 : else
206 18 : ReplicationSlotValidateName(opts->slot_name, ERROR);
207 : }
208 344 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
209 224 : strcmp(defel->defname, "copy_data") == 0)
210 : {
211 32 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
212 0 : errorConflictingDefElem(defel, pstate);
213 :
214 32 : opts->specified_opts |= SUBOPT_COPY_DATA;
215 32 : opts->copy_data = defGetBoolean(defel);
216 : }
217 312 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
218 222 : strcmp(defel->defname, "synchronous_commit") == 0)
219 : {
220 12 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
221 0 : errorConflictingDefElem(defel, pstate);
222 :
223 12 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
224 12 : opts->synchronous_commit = defGetString(defel);
225 :
226 : /* Test if the given value is valid for synchronous_commit GUC. */
227 12 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
228 : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
229 : false, 0, false);
230 : }
231 300 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
232 66 : strcmp(defel->defname, "refresh") == 0)
233 : {
234 66 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
235 0 : errorConflictingDefElem(defel, pstate);
236 :
237 66 : opts->specified_opts |= SUBOPT_REFRESH;
238 66 : opts->refresh = defGetBoolean(defel);
239 : }
240 234 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
241 210 : strcmp(defel->defname, "binary") == 0)
242 : {
243 32 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
244 0 : errorConflictingDefElem(defel, pstate);
245 :
246 32 : opts->specified_opts |= SUBOPT_BINARY;
247 32 : opts->binary = defGetBoolean(defel);
248 : }
249 202 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
250 178 : strcmp(defel->defname, "streaming") == 0)
251 : {
252 62 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
253 0 : errorConflictingDefElem(defel, pstate);
254 :
255 62 : opts->specified_opts |= SUBOPT_STREAMING;
256 62 : opts->streaming = defGetStreamingMode(defel);
257 : }
258 140 : else if (strcmp(defel->defname, "two_phase") == 0)
259 : {
260 : /*
261 : * Do not allow toggling of two_phase option. Doing so could cause
262 : * missing of transactions and lead to an inconsistent replica.
263 : * See comments atop worker.c
264 : *
265 : * Note: Unsupported twophase indicates that this call originated
266 : * from AlterSubscription.
267 : */
268 36 : if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
269 6 : ereport(ERROR,
270 : (errcode(ERRCODE_SYNTAX_ERROR),
271 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
272 :
273 30 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
274 0 : errorConflictingDefElem(defel, pstate);
275 :
276 30 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
277 30 : opts->twophase = defGetBoolean(defel);
278 : }
279 104 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
280 80 : strcmp(defel->defname, "disable_on_error") == 0)
281 : {
282 20 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
283 0 : errorConflictingDefElem(defel, pstate);
284 :
285 20 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
286 20 : opts->disableonerr = defGetBoolean(defel);
287 : }
288 84 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
289 60 : strcmp(defel->defname, "password_required") == 0)
290 : {
291 22 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
292 0 : errorConflictingDefElem(defel, pstate);
293 :
294 22 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
295 22 : opts->passwordrequired = defGetBoolean(defel);
296 : }
297 62 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
298 38 : strcmp(defel->defname, "run_as_owner") == 0)
299 : {
300 2 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
301 0 : errorConflictingDefElem(defel, pstate);
302 :
303 2 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
304 2 : opts->runasowner = defGetBoolean(defel);
305 : }
306 60 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
307 36 : strcmp(defel->defname, "origin") == 0)
308 : {
309 30 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
310 0 : errorConflictingDefElem(defel, pstate);
311 :
312 30 : opts->specified_opts |= SUBOPT_ORIGIN;
313 30 : pfree(opts->origin);
314 :
315 : /*
316 : * Even though the "origin" parameter allows only "none" and "any"
317 : * values, it is implemented as a string type so that the
318 : * parameter can be extended in future versions to support
319 : * filtering using origin names specified by the user.
320 : */
321 30 : opts->origin = defGetString(defel);
322 :
323 44 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
324 14 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
325 6 : ereport(ERROR,
326 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
327 : errmsg("unrecognized origin value: \"%s\"", opts->origin));
328 : }
329 30 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
330 24 : strcmp(defel->defname, "lsn") == 0)
331 18 : {
332 24 : char *lsn_str = defGetString(defel);
333 : XLogRecPtr lsn;
334 :
335 24 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
336 0 : errorConflictingDefElem(defel, pstate);
337 :
338 : /* Setting lsn = NONE is treated as resetting LSN */
339 24 : if (strcmp(lsn_str, "none") == 0)
340 6 : lsn = InvalidXLogRecPtr;
341 : else
342 : {
343 : /* Parse the argument as LSN */
344 18 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
345 : CStringGetDatum(lsn_str)));
346 :
347 18 : if (XLogRecPtrIsInvalid(lsn))
348 6 : ereport(ERROR,
349 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
350 : errmsg("invalid WAL location (LSN): %s", lsn_str)));
351 : }
352 :
353 18 : opts->specified_opts |= SUBOPT_LSN;
354 18 : opts->lsn = lsn;
355 : }
356 : else
357 6 : ereport(ERROR,
358 : (errcode(ERRCODE_SYNTAX_ERROR),
359 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
360 : }
361 :
362 : /*
363 : * We've been explicitly asked to not connect, that requires some
364 : * additional processing.
365 : */
366 666 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
367 : {
368 : /* Check for incompatible options from the user. */
369 128 : if (opts->enabled &&
370 128 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
371 6 : ereport(ERROR,
372 : (errcode(ERRCODE_SYNTAX_ERROR),
373 : /*- translator: both %s are strings of the form "option = value" */
374 : errmsg("%s and %s are mutually exclusive options",
375 : "connect = false", "enabled = true")));
376 :
377 122 : if (opts->create_slot &&
378 116 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
379 6 : ereport(ERROR,
380 : (errcode(ERRCODE_SYNTAX_ERROR),
381 : errmsg("%s and %s are mutually exclusive options",
382 : "connect = false", "create_slot = true")));
383 :
384 116 : if (opts->copy_data &&
385 110 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
386 6 : ereport(ERROR,
387 : (errcode(ERRCODE_SYNTAX_ERROR),
388 : errmsg("%s and %s are mutually exclusive options",
389 : "connect = false", "copy_data = true")));
390 :
391 : /* Change the defaults of other options. */
392 110 : opts->enabled = false;
393 110 : opts->create_slot = false;
394 110 : opts->copy_data = false;
395 : }
396 :
397 : /*
398 : * Do additional checking for disallowed combination when slot_name = NONE
399 : * was used.
400 : */
401 648 : if (!opts->slot_name &&
402 636 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
403 : {
404 100 : if (opts->enabled)
405 : {
406 18 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
407 6 : ereport(ERROR,
408 : (errcode(ERRCODE_SYNTAX_ERROR),
409 : /*- translator: both %s are strings of the form "option = value" */
410 : errmsg("%s and %s are mutually exclusive options",
411 : "slot_name = NONE", "enabled = true")));
412 : else
413 12 : ereport(ERROR,
414 : (errcode(ERRCODE_SYNTAX_ERROR),
415 : /*- translator: both %s are strings of the form "option = value" */
416 : errmsg("subscription with %s must also set %s",
417 : "slot_name = NONE", "enabled = false")));
418 : }
419 :
420 82 : if (opts->create_slot)
421 : {
422 12 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
423 6 : ereport(ERROR,
424 : (errcode(ERRCODE_SYNTAX_ERROR),
425 : /*- translator: both %s are strings of the form "option = value" */
426 : errmsg("%s and %s are mutually exclusive options",
427 : "slot_name = NONE", "create_slot = true")));
428 : else
429 6 : ereport(ERROR,
430 : (errcode(ERRCODE_SYNTAX_ERROR),
431 : /*- translator: both %s are strings of the form "option = value" */
432 : errmsg("subscription with %s must also set %s",
433 : "slot_name = NONE", "create_slot = false")));
434 : }
435 : }
436 618 : }
437 :
438 : /*
439 : * Add publication names from the list to a string.
440 : */
441 : static void
442 420 : get_publications_str(List *publications, StringInfo dest, bool quote_literal)
443 : {
444 : ListCell *lc;
445 420 : bool first = true;
446 :
447 : Assert(publications != NIL);
448 :
449 998 : foreach(lc, publications)
450 : {
451 578 : char *pubname = strVal(lfirst(lc));
452 :
453 578 : if (first)
454 420 : first = false;
455 : else
456 158 : appendStringInfoString(dest, ", ");
457 :
458 578 : if (quote_literal)
459 566 : appendStringInfoString(dest, quote_literal_cstr(pubname));
460 : else
461 : {
462 12 : appendStringInfoChar(dest, '"');
463 12 : appendStringInfoString(dest, pubname);
464 12 : appendStringInfoChar(dest, '"');
465 : }
466 : }
467 420 : }
468 :
469 : /*
470 : * Check that the specified publications are present on the publisher.
471 : */
472 : static void
473 178 : check_publications(WalReceiverConn *wrconn, List *publications)
474 : {
475 : WalRcvExecResult *res;
476 : StringInfo cmd;
477 : TupleTableSlot *slot;
478 178 : List *publicationsCopy = NIL;
479 178 : Oid tableRow[1] = {TEXTOID};
480 :
481 178 : cmd = makeStringInfo();
482 178 : appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
483 : " pg_catalog.pg_publication t WHERE\n"
484 : " t.pubname IN (");
485 178 : get_publications_str(publications, cmd, true);
486 178 : appendStringInfoChar(cmd, ')');
487 :
488 178 : res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
489 178 : pfree(cmd->data);
490 178 : pfree(cmd);
491 :
492 178 : if (res->status != WALRCV_OK_TUPLES)
493 0 : ereport(ERROR,
494 : errmsg("could not receive list of publications from the publisher: %s",
495 : res->err));
496 :
497 178 : publicationsCopy = list_copy(publications);
498 :
499 : /* Process publication(s). */
500 178 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
501 408 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
502 : {
503 : char *pubname;
504 : bool isnull;
505 :
506 230 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
507 : Assert(!isnull);
508 :
509 : /* Delete the publication present in publisher from the list. */
510 230 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
511 230 : ExecClearTuple(slot);
512 : }
513 :
514 178 : ExecDropSingleTupleTableSlot(slot);
515 :
516 178 : walrcv_clear_result(res);
517 :
518 178 : if (list_length(publicationsCopy))
519 : {
520 : /* Prepare the list of non-existent publication(s) for error message. */
521 6 : StringInfo pubnames = makeStringInfo();
522 :
523 6 : get_publications_str(publicationsCopy, pubnames, false);
524 6 : ereport(WARNING,
525 : errcode(ERRCODE_UNDEFINED_OBJECT),
526 : errmsg_plural("publication %s does not exist on the publisher",
527 : "publications %s do not exist on the publisher",
528 : list_length(publicationsCopy),
529 : pubnames->data));
530 : }
531 178 : }
532 :
533 : /*
534 : * Auxiliary function to build a text array out of a list of String nodes.
535 : */
536 : static Datum
537 296 : publicationListToArray(List *publist)
538 : {
539 : ArrayType *arr;
540 : Datum *datums;
541 : MemoryContext memcxt;
542 : MemoryContext oldcxt;
543 :
544 : /* Create memory context for temporary allocations. */
545 296 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
546 : "publicationListToArray to array",
547 : ALLOCSET_DEFAULT_SIZES);
548 296 : oldcxt = MemoryContextSwitchTo(memcxt);
549 :
550 296 : datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
551 :
552 296 : check_duplicates_in_publist(publist, datums);
553 :
554 290 : MemoryContextSwitchTo(oldcxt);
555 :
556 290 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
557 :
558 290 : MemoryContextDelete(memcxt);
559 :
560 290 : return PointerGetDatum(arr);
561 : }
562 :
563 : /*
564 : * Create new subscription.
565 : */
566 : ObjectAddress
567 356 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
568 : bool isTopLevel)
569 : {
570 : Relation rel;
571 : ObjectAddress myself;
572 : Oid subid;
573 : bool nulls[Natts_pg_subscription];
574 : Datum values[Natts_pg_subscription];
575 356 : Oid owner = GetUserId();
576 : HeapTuple tup;
577 : char *conninfo;
578 : char originname[NAMEDATALEN];
579 : List *publications;
580 : bits32 supported_opts;
581 356 : SubOpts opts = {0};
582 : AclResult aclresult;
583 :
584 : /*
585 : * Parse and check options.
586 : *
587 : * Connection and publication should not be specified here.
588 : */
589 356 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
590 : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
591 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
592 : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
593 : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
594 : SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
595 356 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
596 :
597 : /*
598 : * Since creating a replication slot is not transactional, rolling back
599 : * the transaction leaves the created replication slot. So we cannot run
600 : * CREATE SUBSCRIPTION inside a transaction block if creating a
601 : * replication slot.
602 : */
603 278 : if (opts.create_slot)
604 166 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
605 :
606 : /*
607 : * We don't want to allow unprivileged users to be able to trigger
608 : * attempts to access arbitrary network destinations, so require the user
609 : * to have been specifically authorized to create subscriptions.
610 : */
611 272 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
612 6 : ereport(ERROR,
613 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
614 : errmsg("permission denied to create subscription"),
615 : errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
616 : "pg_create_subscription")));
617 :
618 : /*
619 : * Since a subscription is a database object, we also check for CREATE
620 : * permission on the database.
621 : */
622 266 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
623 : owner, ACL_CREATE);
624 266 : if (aclresult != ACLCHECK_OK)
625 12 : aclcheck_error(aclresult, OBJECT_DATABASE,
626 6 : get_database_name(MyDatabaseId));
627 :
628 : /*
629 : * Non-superusers are required to set a password for authentication, and
630 : * that password must be used by the target server, but the superuser can
631 : * exempt a subscription from this requirement.
632 : */
633 260 : if (!opts.passwordrequired && !superuser_arg(owner))
634 6 : ereport(ERROR,
635 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
636 : errmsg("password_required=false is superuser-only"),
637 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
638 :
639 : /*
640 : * If built with appropriate switch, whine when regression-testing
641 : * conventions for subscription names are violated.
642 : */
643 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
644 : if (strncmp(stmt->subname, "regress_", 8) != 0)
645 : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
646 : #endif
647 :
648 254 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
649 :
650 : /* Check if name is used */
651 254 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
652 : MyDatabaseId, CStringGetDatum(stmt->subname));
653 254 : if (OidIsValid(subid))
654 : {
655 6 : ereport(ERROR,
656 : (errcode(ERRCODE_DUPLICATE_OBJECT),
657 : errmsg("subscription \"%s\" already exists",
658 : stmt->subname)));
659 : }
660 :
661 248 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
662 222 : opts.slot_name == NULL)
663 222 : opts.slot_name = stmt->subname;
664 :
665 : /* The default for synchronous_commit of subscriptions is off. */
666 248 : if (opts.synchronous_commit == NULL)
667 248 : opts.synchronous_commit = "off";
668 :
669 248 : conninfo = stmt->conninfo;
670 248 : publications = stmt->publication;
671 :
672 : /* Load the library providing us libpq calls. */
673 248 : load_file("libpqwalreceiver", false);
674 :
675 : /* Check the connection info string. */
676 248 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
677 :
678 : /* Everything ok, form a new tuple. */
679 230 : memset(values, 0, sizeof(values));
680 230 : memset(nulls, false, sizeof(nulls));
681 :
682 230 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
683 : Anum_pg_subscription_oid);
684 230 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
685 230 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
686 230 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
687 230 : values[Anum_pg_subscription_subname - 1] =
688 230 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
689 230 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
690 230 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
691 230 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
692 230 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
693 230 : values[Anum_pg_subscription_subtwophasestate - 1] =
694 230 : CharGetDatum(opts.twophase ?
695 : LOGICALREP_TWOPHASE_STATE_PENDING :
696 : LOGICALREP_TWOPHASE_STATE_DISABLED);
697 230 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
698 230 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
699 230 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
700 230 : values[Anum_pg_subscription_subconninfo - 1] =
701 230 : CStringGetTextDatum(conninfo);
702 230 : if (opts.slot_name)
703 210 : values[Anum_pg_subscription_subslotname - 1] =
704 210 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
705 : else
706 20 : nulls[Anum_pg_subscription_subslotname - 1] = true;
707 230 : values[Anum_pg_subscription_subsynccommit - 1] =
708 230 : CStringGetTextDatum(opts.synchronous_commit);
709 224 : values[Anum_pg_subscription_subpublications - 1] =
710 230 : publicationListToArray(publications);
711 224 : values[Anum_pg_subscription_suborigin - 1] =
712 224 : CStringGetTextDatum(opts.origin);
713 :
714 224 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
715 :
716 : /* Insert tuple into catalog. */
717 224 : CatalogTupleInsert(rel, tup);
718 224 : heap_freetuple(tup);
719 :
720 224 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
721 :
722 224 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
723 224 : replorigin_create(originname);
724 :
725 : /*
726 : * Connect to remote side to execute requested commands and fetch table
727 : * info.
728 : */
729 224 : if (opts.connect)
730 : {
731 : char *err;
732 : WalReceiverConn *wrconn;
733 : List *tables;
734 : ListCell *lc;
735 : char table_state;
736 : bool must_use_password;
737 :
738 : /* Try to connect to the publisher. */
739 150 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
740 150 : wrconn = walrcv_connect(conninfo, true, must_use_password,
741 : stmt->subname, &err);
742 150 : if (!wrconn)
743 6 : ereport(ERROR,
744 : (errcode(ERRCODE_CONNECTION_FAILURE),
745 : errmsg("could not connect to the publisher: %s", err)));
746 :
747 144 : PG_TRY();
748 : {
749 144 : check_publications(wrconn, publications);
750 144 : check_publications_origin(wrconn, publications, opts.copy_data,
751 : opts.origin, NULL, 0, stmt->subname);
752 :
753 : /*
754 : * Set sync state based on if we were asked to do data copy or
755 : * not.
756 : */
757 144 : table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
758 :
759 : /*
760 : * Get the table list from publisher and build local table status
761 : * info.
762 : */
763 144 : tables = fetch_table_list(wrconn, publications);
764 400 : foreach(lc, tables)
765 : {
766 258 : RangeVar *rv = (RangeVar *) lfirst(lc);
767 : Oid relid;
768 :
769 258 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
770 :
771 : /* Check for supported relkind. */
772 258 : CheckSubscriptionRelkind(get_rel_relkind(relid),
773 258 : rv->schemaname, rv->relname);
774 :
775 258 : AddSubscriptionRelState(subid, relid, table_state,
776 : InvalidXLogRecPtr);
777 : }
778 :
779 : /*
780 : * If requested, create permanent slot for the subscription. We
781 : * won't use the initial snapshot for anything, so no need to
782 : * export it.
783 : */
784 142 : if (opts.create_slot)
785 : {
786 140 : bool twophase_enabled = false;
787 :
788 : Assert(opts.slot_name);
789 :
790 : /*
791 : * Even if two_phase is set, don't create the slot with
792 : * two-phase enabled. Will enable it once all the tables are
793 : * synced and ready. This avoids race-conditions like prepared
794 : * transactions being skipped due to changes not being applied
795 : * due to checks in should_apply_changes_for_rel() when
796 : * tablesync for the corresponding tables are in progress. See
797 : * comments atop worker.c.
798 : *
799 : * Note that if tables were specified but copy_data is false
800 : * then it is safe to enable two_phase up-front because those
801 : * tables are already initially in READY state. When the
802 : * subscription has no tables, we leave the twophase state as
803 : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
804 : * PUBLICATION to work.
805 : */
806 140 : if (opts.twophase && !opts.copy_data && tables != NIL)
807 2 : twophase_enabled = true;
808 :
809 140 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
810 : CRS_NOEXPORT_SNAPSHOT, NULL);
811 :
812 140 : if (twophase_enabled)
813 2 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
814 :
815 140 : ereport(NOTICE,
816 : (errmsg("created replication slot \"%s\" on publisher",
817 : opts.slot_name)));
818 : }
819 : }
820 2 : PG_FINALLY();
821 : {
822 144 : walrcv_disconnect(wrconn);
823 : }
824 144 : PG_END_TRY();
825 : }
826 : else
827 74 : ereport(WARNING,
828 : (errmsg("subscription was created, but is not connected"),
829 : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
830 :
831 216 : table_close(rel, RowExclusiveLock);
832 :
833 216 : pgstat_create_subscription(subid);
834 :
835 216 : if (opts.enabled)
836 142 : ApplyLauncherWakeupAtCommit();
837 :
838 216 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
839 :
840 216 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
841 :
842 216 : return myself;
843 : }
844 :
845 : static void
846 76 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
847 : List *validate_publications)
848 : {
849 : char *err;
850 : List *pubrel_names;
851 : List *subrel_states;
852 : Oid *subrel_local_oids;
853 : Oid *pubrel_local_oids;
854 : ListCell *lc;
855 : int off;
856 : int remove_rel_len;
857 : int subrel_count;
858 76 : Relation rel = NULL;
859 : typedef struct SubRemoveRels
860 : {
861 : Oid relid;
862 : char state;
863 : } SubRemoveRels;
864 : SubRemoveRels *sub_remove_rels;
865 : WalReceiverConn *wrconn;
866 : bool must_use_password;
867 :
868 : /* Load the library providing us libpq calls. */
869 76 : load_file("libpqwalreceiver", false);
870 :
871 : /* Try to connect to the publisher. */
872 76 : must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
873 76 : wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
874 : sub->name, &err);
875 76 : if (!wrconn)
876 0 : ereport(ERROR,
877 : (errcode(ERRCODE_CONNECTION_FAILURE),
878 : errmsg("could not connect to the publisher: %s", err)));
879 :
880 76 : PG_TRY();
881 : {
882 76 : if (validate_publications)
883 34 : check_publications(wrconn, validate_publications);
884 :
885 : /* Get the table list from publisher. */
886 76 : pubrel_names = fetch_table_list(wrconn, sub->publications);
887 :
888 : /* Get local table list. */
889 76 : subrel_states = GetSubscriptionRelations(sub->oid, false);
890 76 : subrel_count = list_length(subrel_states);
891 :
892 : /*
893 : * Build qsorted array of local table oids for faster lookup. This can
894 : * potentially contain all tables in the database so speed of lookup
895 : * is important.
896 : */
897 76 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
898 76 : off = 0;
899 262 : foreach(lc, subrel_states)
900 : {
901 186 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
902 :
903 186 : subrel_local_oids[off++] = relstate->relid;
904 : }
905 76 : qsort(subrel_local_oids, subrel_count,
906 : sizeof(Oid), oid_cmp);
907 :
908 76 : check_publications_origin(wrconn, sub->publications, copy_data,
909 : sub->origin, subrel_local_oids,
910 : subrel_count, sub->name);
911 :
912 : /*
913 : * Rels that we want to remove from subscription and drop any slots
914 : * and origins corresponding to them.
915 : */
916 76 : sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
917 :
918 : /*
919 : * Walk over the remote tables and try to match them to locally known
920 : * tables. If the table is not known locally create a new state for
921 : * it.
922 : *
923 : * Also builds array of local oids of remote tables for the next step.
924 : */
925 76 : off = 0;
926 76 : pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
927 :
928 260 : foreach(lc, pubrel_names)
929 : {
930 184 : RangeVar *rv = (RangeVar *) lfirst(lc);
931 : Oid relid;
932 :
933 184 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
934 :
935 : /* Check for supported relkind. */
936 184 : CheckSubscriptionRelkind(get_rel_relkind(relid),
937 184 : rv->schemaname, rv->relname);
938 :
939 184 : pubrel_local_oids[off++] = relid;
940 :
941 184 : if (!bsearch(&relid, subrel_local_oids,
942 : subrel_count, sizeof(Oid), oid_cmp))
943 : {
944 58 : AddSubscriptionRelState(sub->oid, relid,
945 : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
946 : InvalidXLogRecPtr);
947 58 : ereport(DEBUG1,
948 : (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
949 : rv->schemaname, rv->relname, sub->name)));
950 : }
951 : }
952 :
953 : /*
954 : * Next remove state for tables we should not care about anymore using
955 : * the data we collected above
956 : */
957 76 : qsort(pubrel_local_oids, list_length(pubrel_names),
958 : sizeof(Oid), oid_cmp);
959 :
960 76 : remove_rel_len = 0;
961 262 : for (off = 0; off < subrel_count; off++)
962 : {
963 186 : Oid relid = subrel_local_oids[off];
964 :
965 186 : if (!bsearch(&relid, pubrel_local_oids,
966 186 : list_length(pubrel_names), sizeof(Oid), oid_cmp))
967 : {
968 : char state;
969 : XLogRecPtr statelsn;
970 :
971 : /*
972 : * Lock pg_subscription_rel with AccessExclusiveLock to
973 : * prevent any race conditions with the apply worker
974 : * re-launching workers at the same time this code is trying
975 : * to remove those tables.
976 : *
977 : * Even if new worker for this particular rel is restarted it
978 : * won't be able to make any progress as we hold exclusive
979 : * lock on pg_subscription_rel till the transaction end. It
980 : * will simply exit as there is no corresponding rel entry.
981 : *
982 : * This locking also ensures that the state of rels won't
983 : * change till we are done with this refresh operation.
984 : */
985 60 : if (!rel)
986 30 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
987 :
988 : /* Last known rel state. */
989 60 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
990 :
991 60 : sub_remove_rels[remove_rel_len].relid = relid;
992 60 : sub_remove_rels[remove_rel_len++].state = state;
993 :
994 60 : RemoveSubscriptionRel(sub->oid, relid);
995 :
996 60 : logicalrep_worker_stop(sub->oid, relid);
997 :
998 : /*
999 : * For READY state, we would have already dropped the
1000 : * tablesync origin.
1001 : */
1002 60 : if (state != SUBREL_STATE_READY)
1003 : {
1004 : char originname[NAMEDATALEN];
1005 :
1006 : /*
1007 : * Drop the tablesync's origin tracking if exists.
1008 : *
1009 : * It is possible that the origin is not yet created for
1010 : * tablesync worker, this can happen for the states before
1011 : * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
1012 : * apply worker can also concurrently try to drop the
1013 : * origin and by this time the origin might be already
1014 : * removed. For these reasons, passing missing_ok = true.
1015 : */
1016 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1017 : sizeof(originname));
1018 0 : replorigin_drop_by_name(originname, true, false);
1019 : }
1020 :
1021 60 : ereport(DEBUG1,
1022 : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1023 : get_namespace_name(get_rel_namespace(relid)),
1024 : get_rel_name(relid),
1025 : sub->name)));
1026 : }
1027 : }
1028 :
1029 : /*
1030 : * Drop the tablesync slots associated with removed tables. This has
1031 : * to be at the end because otherwise if there is an error while doing
1032 : * the database operations we won't be able to rollback dropped slots.
1033 : */
1034 136 : for (off = 0; off < remove_rel_len; off++)
1035 : {
1036 60 : if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
1037 0 : sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
1038 : {
1039 0 : char syncslotname[NAMEDATALEN] = {0};
1040 :
1041 : /*
1042 : * For READY/SYNCDONE states we know the tablesync slot has
1043 : * already been dropped by the tablesync worker.
1044 : *
1045 : * For other states, there is no certainty, maybe the slot
1046 : * does not exist yet. Also, if we fail after removing some of
1047 : * the slots, next time, it will again try to drop already
1048 : * dropped slots and fail. For these reasons, we allow
1049 : * missing_ok = true for the drop.
1050 : */
1051 0 : ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
1052 : syncslotname, sizeof(syncslotname));
1053 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1054 : }
1055 : }
1056 : }
1057 0 : PG_FINALLY();
1058 : {
1059 76 : walrcv_disconnect(wrconn);
1060 : }
1061 76 : PG_END_TRY();
1062 :
1063 76 : if (rel)
1064 30 : table_close(rel, NoLock);
1065 76 : }
1066 :
1067 : /*
1068 : * Alter the existing subscription.
1069 : */
1070 : ObjectAddress
1071 396 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1072 : bool isTopLevel)
1073 : {
1074 : Relation rel;
1075 : ObjectAddress myself;
1076 : bool nulls[Natts_pg_subscription];
1077 : bool replaces[Natts_pg_subscription];
1078 : Datum values[Natts_pg_subscription];
1079 : HeapTuple tup;
1080 : Oid subid;
1081 396 : bool update_tuple = false;
1082 : Subscription *sub;
1083 : Form_pg_subscription form;
1084 : bits32 supported_opts;
1085 396 : SubOpts opts = {0};
1086 :
1087 396 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1088 :
1089 : /* Fetch the existing tuple. */
1090 396 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1091 : CStringGetDatum(stmt->subname));
1092 :
1093 396 : if (!HeapTupleIsValid(tup))
1094 6 : ereport(ERROR,
1095 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1096 : errmsg("subscription \"%s\" does not exist",
1097 : stmt->subname)));
1098 :
1099 390 : form = (Form_pg_subscription) GETSTRUCT(tup);
1100 390 : subid = form->oid;
1101 :
1102 : /* must be owner */
1103 390 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1104 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1105 0 : stmt->subname);
1106 :
1107 390 : sub = GetSubscription(subid, false);
1108 :
1109 : /*
1110 : * Don't allow non-superuser modification of a subscription with
1111 : * password_required=false.
1112 : */
1113 390 : if (!sub->passwordrequired && !superuser())
1114 0 : ereport(ERROR,
1115 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1116 : errmsg("password_required=false is superuser-only"),
1117 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1118 :
1119 : /* Lock the subscription so nobody else can do anything with it. */
1120 390 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1121 :
1122 : /* Form a new tuple. */
1123 390 : memset(values, 0, sizeof(values));
1124 390 : memset(nulls, false, sizeof(nulls));
1125 390 : memset(replaces, false, sizeof(replaces));
1126 :
1127 390 : switch (stmt->kind)
1128 : {
1129 156 : case ALTER_SUBSCRIPTION_OPTIONS:
1130 : {
1131 156 : supported_opts = (SUBOPT_SLOT_NAME |
1132 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1133 : SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
1134 : SUBOPT_PASSWORD_REQUIRED |
1135 : SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
1136 :
1137 156 : parse_subscription_options(pstate, stmt->options,
1138 : supported_opts, &opts);
1139 :
1140 132 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1141 : {
1142 : /*
1143 : * The subscription must be disabled to allow slot_name as
1144 : * 'none', otherwise, the apply worker will repeatedly try
1145 : * to stream the data using that slot_name which neither
1146 : * exists on the publisher nor the user will be allowed to
1147 : * create it.
1148 : */
1149 56 : if (sub->enabled && !opts.slot_name)
1150 0 : ereport(ERROR,
1151 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1152 : errmsg("cannot set %s for enabled subscription",
1153 : "slot_name = NONE")));
1154 :
1155 56 : if (opts.slot_name)
1156 6 : values[Anum_pg_subscription_subslotname - 1] =
1157 6 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1158 : else
1159 50 : nulls[Anum_pg_subscription_subslotname - 1] = true;
1160 56 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1161 : }
1162 :
1163 132 : if (opts.synchronous_commit)
1164 : {
1165 6 : values[Anum_pg_subscription_subsynccommit - 1] =
1166 6 : CStringGetTextDatum(opts.synchronous_commit);
1167 6 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1168 : }
1169 :
1170 132 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1171 : {
1172 18 : values[Anum_pg_subscription_subbinary - 1] =
1173 18 : BoolGetDatum(opts.binary);
1174 18 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1175 : }
1176 :
1177 132 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1178 : {
1179 30 : values[Anum_pg_subscription_substream - 1] =
1180 30 : CharGetDatum(opts.streaming);
1181 30 : replaces[Anum_pg_subscription_substream - 1] = true;
1182 : }
1183 :
1184 132 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1185 : {
1186 : values[Anum_pg_subscription_subdisableonerr - 1]
1187 6 : = BoolGetDatum(opts.disableonerr);
1188 : replaces[Anum_pg_subscription_subdisableonerr - 1]
1189 6 : = true;
1190 : }
1191 :
1192 132 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1193 : {
1194 : /* Non-superuser may not disable password_required. */
1195 12 : if (!opts.passwordrequired && !superuser())
1196 0 : ereport(ERROR,
1197 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1198 : errmsg("password_required=false is superuser-only"),
1199 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1200 :
1201 : values[Anum_pg_subscription_subpasswordrequired - 1]
1202 12 : = BoolGetDatum(opts.passwordrequired);
1203 : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1204 12 : = true;
1205 : }
1206 :
1207 132 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1208 : {
1209 6 : values[Anum_pg_subscription_suborigin - 1] =
1210 6 : CStringGetTextDatum(opts.origin);
1211 6 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1212 : }
1213 :
1214 132 : update_tuple = true;
1215 132 : break;
1216 : }
1217 :
1218 42 : case ALTER_SUBSCRIPTION_ENABLED:
1219 : {
1220 42 : parse_subscription_options(pstate, stmt->options,
1221 : SUBOPT_ENABLED, &opts);
1222 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1223 :
1224 42 : if (!sub->slotname && opts.enabled)
1225 6 : ereport(ERROR,
1226 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1227 : errmsg("cannot enable subscription that does not have a slot name")));
1228 :
1229 36 : values[Anum_pg_subscription_subenabled - 1] =
1230 36 : BoolGetDatum(opts.enabled);
1231 36 : replaces[Anum_pg_subscription_subenabled - 1] = true;
1232 :
1233 36 : if (opts.enabled)
1234 20 : ApplyLauncherWakeupAtCommit();
1235 :
1236 36 : update_tuple = true;
1237 36 : break;
1238 : }
1239 :
1240 14 : case ALTER_SUBSCRIPTION_CONNECTION:
1241 : /* Load the library providing us libpq calls. */
1242 14 : load_file("libpqwalreceiver", false);
1243 : /* Check the connection info string. */
1244 14 : walrcv_check_conninfo(stmt->conninfo,
1245 : sub->passwordrequired && !superuser_arg(sub->owner));
1246 :
1247 8 : values[Anum_pg_subscription_subconninfo - 1] =
1248 8 : CStringGetTextDatum(stmt->conninfo);
1249 8 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1250 8 : update_tuple = true;
1251 8 : break;
1252 :
1253 48 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
1254 : {
1255 48 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1256 48 : parse_subscription_options(pstate, stmt->options,
1257 : supported_opts, &opts);
1258 :
1259 48 : values[Anum_pg_subscription_subpublications - 1] =
1260 48 : publicationListToArray(stmt->publication);
1261 48 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1262 :
1263 48 : update_tuple = true;
1264 :
1265 : /* Refresh if user asked us to. */
1266 48 : if (opts.refresh)
1267 : {
1268 42 : if (!sub->enabled)
1269 0 : ereport(ERROR,
1270 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1271 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1272 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1273 :
1274 : /*
1275 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1276 : * not allowed.
1277 : */
1278 42 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1279 0 : ereport(ERROR,
1280 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1281 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1282 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1283 :
1284 42 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1285 :
1286 : /* Make sure refresh sees the new list of publications. */
1287 30 : sub->publications = stmt->publication;
1288 :
1289 30 : AlterSubscription_refresh(sub, opts.copy_data,
1290 : stmt->publication);
1291 : }
1292 :
1293 36 : break;
1294 : }
1295 :
1296 54 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
1297 : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1298 : {
1299 : List *publist;
1300 54 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1301 :
1302 54 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1303 54 : parse_subscription_options(pstate, stmt->options,
1304 : supported_opts, &opts);
1305 :
1306 54 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1307 18 : values[Anum_pg_subscription_subpublications - 1] =
1308 18 : publicationListToArray(publist);
1309 18 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1310 :
1311 18 : update_tuple = true;
1312 :
1313 : /* Refresh if user asked us to. */
1314 18 : if (opts.refresh)
1315 : {
1316 : /* We only need to validate user specified publications. */
1317 6 : List *validate_publications = (isadd) ? stmt->publication : NULL;
1318 :
1319 6 : if (!sub->enabled)
1320 0 : ereport(ERROR,
1321 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1322 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1323 : /* translator: %s is an SQL ALTER command */
1324 : errhint("Use %s instead.",
1325 : isadd ?
1326 : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1327 : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1328 :
1329 : /*
1330 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1331 : * not allowed.
1332 : */
1333 6 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1334 0 : ereport(ERROR,
1335 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1336 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1337 : /* translator: %s is an SQL ALTER command */
1338 : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1339 : isadd ?
1340 : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1341 : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1342 :
1343 6 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1344 :
1345 : /* Refresh the new list of publications. */
1346 6 : sub->publications = publist;
1347 :
1348 6 : AlterSubscription_refresh(sub, opts.copy_data,
1349 : validate_publications);
1350 : }
1351 :
1352 18 : break;
1353 : }
1354 :
1355 52 : case ALTER_SUBSCRIPTION_REFRESH:
1356 : {
1357 52 : if (!sub->enabled)
1358 6 : ereport(ERROR,
1359 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1360 : errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1361 :
1362 46 : parse_subscription_options(pstate, stmt->options,
1363 : SUBOPT_COPY_DATA, &opts);
1364 :
1365 : /*
1366 : * The subscription option "two_phase" requires that
1367 : * replication has passed the initial table synchronization
1368 : * phase before the two_phase becomes properly enabled.
1369 : *
1370 : * But, having reached this two-phase commit "enabled" state
1371 : * we must not allow any subsequent table initialization to
1372 : * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
1373 : * when the user had requested two_phase = on mode.
1374 : *
1375 : * The exception to this restriction is when copy_data =
1376 : * false, because when copy_data is false the tablesync will
1377 : * start already in READY state and will exit directly without
1378 : * doing anything.
1379 : *
1380 : * For more details see comments atop worker.c.
1381 : */
1382 46 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1383 0 : ereport(ERROR,
1384 : (errcode(ERRCODE_SYNTAX_ERROR),
1385 : errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1386 : errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1387 :
1388 46 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
1389 :
1390 40 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
1391 :
1392 40 : break;
1393 : }
1394 :
1395 24 : case ALTER_SUBSCRIPTION_SKIP:
1396 : {
1397 24 : parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1398 :
1399 : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1400 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1401 :
1402 : /*
1403 : * If the user sets subskiplsn, we do a sanity check to make
1404 : * sure that the specified LSN is a probable value.
1405 : */
1406 18 : if (!XLogRecPtrIsInvalid(opts.lsn))
1407 : {
1408 : RepOriginId originid;
1409 : char originname[NAMEDATALEN];
1410 : XLogRecPtr remote_lsn;
1411 :
1412 12 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
1413 : originname, sizeof(originname));
1414 12 : originid = replorigin_by_name(originname, false);
1415 12 : remote_lsn = replorigin_get_progress(originid, false);
1416 :
1417 : /* Check the given LSN is at least a future LSN */
1418 12 : if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1419 0 : ereport(ERROR,
1420 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1421 : errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1422 : LSN_FORMAT_ARGS(opts.lsn),
1423 : LSN_FORMAT_ARGS(remote_lsn))));
1424 : }
1425 :
1426 18 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1427 18 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1428 :
1429 18 : update_tuple = true;
1430 18 : break;
1431 : }
1432 :
1433 0 : default:
1434 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1435 : stmt->kind);
1436 : }
1437 :
1438 : /* Update the catalog if needed. */
1439 288 : if (update_tuple)
1440 : {
1441 248 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1442 : replaces);
1443 :
1444 248 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1445 :
1446 248 : heap_freetuple(tup);
1447 : }
1448 :
1449 288 : table_close(rel, RowExclusiveLock);
1450 :
1451 288 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1452 :
1453 288 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1454 :
1455 : /* Wake up related replication workers to handle this change quickly. */
1456 288 : LogicalRepWorkersWakeupAtCommit(subid);
1457 :
1458 288 : return myself;
1459 : }
1460 :
1461 : /*
1462 : * Drop a subscription
1463 : */
1464 : void
1465 158 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
1466 : {
1467 : Relation rel;
1468 : ObjectAddress myself;
1469 : HeapTuple tup;
1470 : Oid subid;
1471 : Oid subowner;
1472 : Datum datum;
1473 : bool isnull;
1474 : char *subname;
1475 : char *conninfo;
1476 : char *slotname;
1477 : List *subworkers;
1478 : ListCell *lc;
1479 : char originname[NAMEDATALEN];
1480 158 : char *err = NULL;
1481 : WalReceiverConn *wrconn;
1482 : Form_pg_subscription form;
1483 : List *rstates;
1484 : bool must_use_password;
1485 :
1486 : /*
1487 : * Lock pg_subscription with AccessExclusiveLock to ensure that the
1488 : * launcher doesn't restart new worker during dropping the subscription
1489 : */
1490 158 : rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
1491 :
1492 158 : tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
1493 158 : CStringGetDatum(stmt->subname));
1494 :
1495 158 : if (!HeapTupleIsValid(tup))
1496 : {
1497 12 : table_close(rel, NoLock);
1498 :
1499 12 : if (!stmt->missing_ok)
1500 6 : ereport(ERROR,
1501 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1502 : errmsg("subscription \"%s\" does not exist",
1503 : stmt->subname)));
1504 : else
1505 6 : ereport(NOTICE,
1506 : (errmsg("subscription \"%s\" does not exist, skipping",
1507 : stmt->subname)));
1508 :
1509 74 : return;
1510 : }
1511 :
1512 146 : form = (Form_pg_subscription) GETSTRUCT(tup);
1513 146 : subid = form->oid;
1514 146 : subowner = form->subowner;
1515 146 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
1516 :
1517 : /* must be owner */
1518 146 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1519 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1520 0 : stmt->subname);
1521 :
1522 : /* DROP hook for the subscription being removed */
1523 146 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
1524 :
1525 : /*
1526 : * Lock the subscription so nobody else can do anything with it (including
1527 : * the replication workers).
1528 : */
1529 146 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1530 :
1531 : /* Get subname */
1532 146 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1533 : Anum_pg_subscription_subname);
1534 146 : subname = pstrdup(NameStr(*DatumGetName(datum)));
1535 :
1536 : /* Get conninfo */
1537 146 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1538 : Anum_pg_subscription_subconninfo);
1539 146 : conninfo = TextDatumGetCString(datum);
1540 :
1541 : /* Get slotname */
1542 146 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1543 : Anum_pg_subscription_subslotname, &isnull);
1544 146 : if (!isnull)
1545 76 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
1546 : else
1547 70 : slotname = NULL;
1548 :
1549 : /*
1550 : * Since dropping a replication slot is not transactional, the replication
1551 : * slot stays dropped even if the transaction rolls back. So we cannot
1552 : * run DROP SUBSCRIPTION inside a transaction block if dropping the
1553 : * replication slot. Also, in this case, we report a message for dropping
1554 : * the subscription to the cumulative stats system.
1555 : *
1556 : * XXX The command name should really be something like "DROP SUBSCRIPTION
1557 : * of a subscription that is associated with a replication slot", but we
1558 : * don't have the proper facilities for that.
1559 : */
1560 146 : if (slotname)
1561 76 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1562 :
1563 140 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1564 140 : EventTriggerSQLDropAddObject(&myself, true, true);
1565 :
1566 : /* Remove the tuple from catalog. */
1567 140 : CatalogTupleDelete(rel, &tup->t_self);
1568 :
1569 140 : ReleaseSysCache(tup);
1570 :
1571 : /*
1572 : * Stop all the subscription workers immediately.
1573 : *
1574 : * This is necessary if we are dropping the replication slot, so that the
1575 : * slot becomes accessible.
1576 : *
1577 : * It is also necessary if the subscription is disabled and was disabled
1578 : * in the same transaction. Then the workers haven't seen the disabling
1579 : * yet and will still be running, leading to hangs later when we want to
1580 : * drop the replication origin. If the subscription was disabled before
1581 : * this transaction, then there shouldn't be any workers left, so this
1582 : * won't make a difference.
1583 : *
1584 : * New workers won't be started because we hold an exclusive lock on the
1585 : * subscription till the end of the transaction.
1586 : */
1587 140 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1588 140 : subworkers = logicalrep_workers_find(subid, false);
1589 140 : LWLockRelease(LogicalRepWorkerLock);
1590 218 : foreach(lc, subworkers)
1591 : {
1592 78 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
1593 :
1594 78 : logicalrep_worker_stop(w->subid, w->relid);
1595 : }
1596 140 : list_free(subworkers);
1597 :
1598 : /*
1599 : * Remove the no-longer-useful entry in the launcher's table of apply
1600 : * worker start times.
1601 : *
1602 : * If this transaction rolls back, the launcher might restart a failed
1603 : * apply worker before wal_retrieve_retry_interval milliseconds have
1604 : * elapsed, but that's pretty harmless.
1605 : */
1606 140 : ApplyLauncherForgetWorkerStartTime(subid);
1607 :
1608 : /*
1609 : * Cleanup of tablesync replication origins.
1610 : *
1611 : * Any READY-state relations would already have dealt with clean-ups.
1612 : *
1613 : * Note that the state can't change because we have already stopped both
1614 : * the apply and tablesync workers and they can't restart because of
1615 : * exclusive lock on the subscription.
1616 : */
1617 140 : rstates = GetSubscriptionRelations(subid, true);
1618 144 : foreach(lc, rstates)
1619 : {
1620 4 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
1621 4 : Oid relid = rstate->relid;
1622 :
1623 : /* Only cleanup resources of tablesync workers */
1624 4 : if (!OidIsValid(relid))
1625 0 : continue;
1626 :
1627 : /*
1628 : * Drop the tablesync's origin tracking if exists.
1629 : *
1630 : * It is possible that the origin is not yet created for tablesync
1631 : * worker so passing missing_ok = true. This can happen for the states
1632 : * before SUBREL_STATE_FINISHEDCOPY.
1633 : */
1634 4 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
1635 : sizeof(originname));
1636 4 : replorigin_drop_by_name(originname, true, false);
1637 : }
1638 :
1639 : /* Clean up dependencies */
1640 140 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
1641 :
1642 : /* Remove any associated relation synchronization states. */
1643 140 : RemoveSubscriptionRel(subid, InvalidOid);
1644 :
1645 : /* Remove the origin tracking if exists. */
1646 140 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
1647 140 : replorigin_drop_by_name(originname, true, false);
1648 :
1649 : /*
1650 : * If there is no slot associated with the subscription, we can finish
1651 : * here.
1652 : */
1653 140 : if (!slotname && rstates == NIL)
1654 : {
1655 68 : table_close(rel, NoLock);
1656 68 : return;
1657 : }
1658 :
1659 : /*
1660 : * Try to acquire the connection necessary for dropping slots.
1661 : *
1662 : * Note: If the slotname is NONE/NULL then we allow the command to finish
1663 : * and users need to manually cleanup the apply and tablesync worker slots
1664 : * later.
1665 : *
1666 : * This has to be at the end because otherwise if there is an error while
1667 : * doing the database operations we won't be able to rollback dropped
1668 : * slot.
1669 : */
1670 72 : load_file("libpqwalreceiver", false);
1671 :
1672 72 : wrconn = walrcv_connect(conninfo, true, must_use_password,
1673 : subname, &err);
1674 72 : if (wrconn == NULL)
1675 : {
1676 0 : if (!slotname)
1677 : {
1678 : /* be tidy */
1679 0 : list_free(rstates);
1680 0 : table_close(rel, NoLock);
1681 0 : return;
1682 : }
1683 : else
1684 : {
1685 0 : ReportSlotConnectionError(rstates, subid, slotname, err);
1686 : }
1687 : }
1688 :
1689 72 : PG_TRY();
1690 : {
1691 76 : foreach(lc, rstates)
1692 : {
1693 4 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
1694 4 : Oid relid = rstate->relid;
1695 :
1696 : /* Only cleanup resources of tablesync workers */
1697 4 : if (!OidIsValid(relid))
1698 0 : continue;
1699 :
1700 : /*
1701 : * Drop the tablesync slots associated with removed tables.
1702 : *
1703 : * For SYNCDONE/READY states, the tablesync slot is known to have
1704 : * already been dropped by the tablesync worker.
1705 : *
1706 : * For other states, there is no certainty, maybe the slot does
1707 : * not exist yet. Also, if we fail after removing some of the
1708 : * slots, next time, it will again try to drop already dropped
1709 : * slots and fail. For these reasons, we allow missing_ok = true
1710 : * for the drop.
1711 : */
1712 4 : if (rstate->state != SUBREL_STATE_SYNCDONE)
1713 : {
1714 2 : char syncslotname[NAMEDATALEN] = {0};
1715 :
1716 2 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
1717 : sizeof(syncslotname));
1718 2 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1719 : }
1720 : }
1721 :
1722 72 : list_free(rstates);
1723 :
1724 : /*
1725 : * If there is a slot associated with the subscription, then drop the
1726 : * replication slot at the publisher.
1727 : */
1728 72 : if (slotname)
1729 70 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
1730 : }
1731 0 : PG_FINALLY();
1732 : {
1733 72 : walrcv_disconnect(wrconn);
1734 : }
1735 72 : PG_END_TRY();
1736 :
1737 : /*
1738 : * Tell the cumulative stats system that the subscription is getting
1739 : * dropped.
1740 : */
1741 72 : pgstat_drop_subscription(subid);
1742 :
1743 72 : table_close(rel, NoLock);
1744 : }
1745 :
1746 : /*
1747 : * Drop the replication slot at the publisher node using the replication
1748 : * connection.
1749 : *
1750 : * missing_ok - if true then only issue a LOG message if the slot doesn't
1751 : * exist.
1752 : */
1753 : void
1754 382 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
1755 : {
1756 : StringInfoData cmd;
1757 :
1758 : Assert(wrconn);
1759 :
1760 382 : load_file("libpqwalreceiver", false);
1761 :
1762 382 : initStringInfo(&cmd);
1763 382 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
1764 :
1765 382 : PG_TRY();
1766 : {
1767 : WalRcvExecResult *res;
1768 :
1769 382 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
1770 :
1771 382 : if (res->status == WALRCV_OK_COMMAND)
1772 : {
1773 : /* NOTICE. Success. */
1774 382 : ereport(NOTICE,
1775 : (errmsg("dropped replication slot \"%s\" on publisher",
1776 : slotname)));
1777 : }
1778 0 : else if (res->status == WALRCV_ERROR &&
1779 0 : missing_ok &&
1780 0 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
1781 : {
1782 : /* LOG. Error, but missing_ok = true. */
1783 0 : ereport(LOG,
1784 : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
1785 : slotname, res->err)));
1786 : }
1787 : else
1788 : {
1789 : /* ERROR. */
1790 0 : ereport(ERROR,
1791 : (errcode(ERRCODE_CONNECTION_FAILURE),
1792 : errmsg("could not drop replication slot \"%s\" on publisher: %s",
1793 : slotname, res->err)));
1794 : }
1795 :
1796 382 : walrcv_clear_result(res);
1797 : }
1798 0 : PG_FINALLY();
1799 : {
1800 382 : pfree(cmd.data);
1801 : }
1802 382 : PG_END_TRY();
1803 382 : }
1804 :
1805 : /*
1806 : * Internal workhorse for changing a subscription owner
1807 : */
1808 : static void
1809 12 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1810 : {
1811 : Form_pg_subscription form;
1812 : AclResult aclresult;
1813 :
1814 12 : form = (Form_pg_subscription) GETSTRUCT(tup);
1815 :
1816 12 : if (form->subowner == newOwnerId)
1817 0 : return;
1818 :
1819 12 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
1820 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1821 0 : NameStr(form->subname));
1822 :
1823 : /*
1824 : * Don't allow non-superuser modification of a subscription with
1825 : * password_required=false.
1826 : */
1827 12 : if (!form->subpasswordrequired && !superuser())
1828 0 : ereport(ERROR,
1829 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1830 : errmsg("password_required=false is superuser-only"),
1831 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1832 :
1833 : /* Must be able to become new owner */
1834 12 : check_can_set_role(GetUserId(), newOwnerId);
1835 :
1836 : /*
1837 : * current owner must have CREATE on database
1838 : *
1839 : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
1840 : * other object types behave differently (e.g. you can't give a table to a
1841 : * user who lacks CREATE privileges on a schema).
1842 : */
1843 6 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
1844 : GetUserId(), ACL_CREATE);
1845 6 : if (aclresult != ACLCHECK_OK)
1846 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
1847 0 : get_database_name(MyDatabaseId));
1848 :
1849 6 : form->subowner = newOwnerId;
1850 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1851 :
1852 : /* Update owner dependency reference */
1853 6 : changeDependencyOnOwner(SubscriptionRelationId,
1854 : form->oid,
1855 : newOwnerId);
1856 :
1857 6 : InvokeObjectPostAlterHook(SubscriptionRelationId,
1858 : form->oid, 0);
1859 :
1860 : /* Wake up related background processes to handle this change quickly. */
1861 6 : ApplyLauncherWakeupAtCommit();
1862 6 : LogicalRepWorkersWakeupAtCommit(form->oid);
1863 : }
1864 :
1865 : /*
1866 : * Change subscription owner -- by name
1867 : */
1868 : ObjectAddress
1869 12 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
1870 : {
1871 : Oid subid;
1872 : HeapTuple tup;
1873 : Relation rel;
1874 : ObjectAddress address;
1875 : Form_pg_subscription form;
1876 :
1877 12 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1878 :
1879 12 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1880 : CStringGetDatum(name));
1881 :
1882 12 : if (!HeapTupleIsValid(tup))
1883 0 : ereport(ERROR,
1884 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1885 : errmsg("subscription \"%s\" does not exist", name)));
1886 :
1887 12 : form = (Form_pg_subscription) GETSTRUCT(tup);
1888 12 : subid = form->oid;
1889 :
1890 12 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1891 :
1892 6 : ObjectAddressSet(address, SubscriptionRelationId, subid);
1893 :
1894 6 : heap_freetuple(tup);
1895 :
1896 6 : table_close(rel, RowExclusiveLock);
1897 :
1898 6 : return address;
1899 : }
1900 :
1901 : /*
1902 : * Change subscription owner -- by OID
1903 : */
1904 : void
1905 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
1906 : {
1907 : HeapTuple tup;
1908 : Relation rel;
1909 :
1910 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1911 :
1912 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
1913 :
1914 0 : if (!HeapTupleIsValid(tup))
1915 0 : ereport(ERROR,
1916 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1917 : errmsg("subscription with OID %u does not exist", subid)));
1918 :
1919 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1920 :
1921 0 : heap_freetuple(tup);
1922 :
1923 0 : table_close(rel, RowExclusiveLock);
1924 0 : }
1925 :
1926 : /*
1927 : * Check and log a warning if the publisher has subscribed to the same table
1928 : * from some other publisher. This check is required only if "copy_data = true"
1929 : * and "origin = none" for CREATE SUBSCRIPTION and
1930 : * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
1931 : * having origin might have been copied.
1932 : *
1933 : * This check need not be performed on the tables that are already added
1934 : * because incremental sync for those tables will happen through WAL and the
1935 : * origin of the data can be identified from the WAL records.
1936 : *
1937 : * subrel_local_oids contains the list of relation oids that are already
1938 : * present on the subscriber.
1939 : */
1940 : static void
1941 220 : check_publications_origin(WalReceiverConn *wrconn, List *publications,
1942 : bool copydata, char *origin, Oid *subrel_local_oids,
1943 : int subrel_count, char *subname)
1944 : {
1945 : WalRcvExecResult *res;
1946 : StringInfoData cmd;
1947 : TupleTableSlot *slot;
1948 220 : Oid tableRow[1] = {TEXTOID};
1949 220 : List *publist = NIL;
1950 : int i;
1951 :
1952 424 : if (!copydata || !origin ||
1953 204 : (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
1954 208 : return;
1955 :
1956 12 : initStringInfo(&cmd);
1957 12 : appendStringInfoString(&cmd,
1958 : "SELECT DISTINCT P.pubname AS pubname\n"
1959 : "FROM pg_publication P,\n"
1960 : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
1961 : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
1962 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
1963 : "WHERE C.oid = GPT.relid AND P.pubname IN (");
1964 12 : get_publications_str(publications, &cmd, true);
1965 12 : appendStringInfoString(&cmd, ")\n");
1966 :
1967 : /*
1968 : * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
1969 : * the list of relation oids that are already present on the subscriber.
1970 : * This check should be skipped for these tables.
1971 : */
1972 18 : for (i = 0; i < subrel_count; i++)
1973 : {
1974 6 : Oid relid = subrel_local_oids[i];
1975 6 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
1976 6 : char *tablename = get_rel_name(relid);
1977 :
1978 6 : appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
1979 : schemaname, tablename);
1980 : }
1981 :
1982 12 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
1983 12 : pfree(cmd.data);
1984 :
1985 12 : if (res->status != WALRCV_OK_TUPLES)
1986 0 : ereport(ERROR,
1987 : (errcode(ERRCODE_CONNECTION_FAILURE),
1988 : errmsg("could not receive list of replicated tables from the publisher: %s",
1989 : res->err)));
1990 :
1991 : /* Process tables. */
1992 12 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1993 16 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1994 : {
1995 : char *pubname;
1996 : bool isnull;
1997 :
1998 4 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1999 : Assert(!isnull);
2000 :
2001 4 : ExecClearTuple(slot);
2002 4 : publist = list_append_unique(publist, makeString(pubname));
2003 : }
2004 :
2005 : /*
2006 : * Log a warning if the publisher has subscribed to the same table from
2007 : * some other publisher. We cannot know the origin of data during the
2008 : * initial sync. Data origins can be found only from the WAL by looking at
2009 : * the origin id.
2010 : *
2011 : * XXX: For simplicity, we don't check whether the table has any data or
2012 : * not. If the table doesn't have any data then we don't need to
2013 : * distinguish between data having origin and data not having origin so we
2014 : * can avoid logging a warning in that case.
2015 : */
2016 12 : if (publist)
2017 : {
2018 4 : StringInfo pubnames = makeStringInfo();
2019 :
2020 : /* Prepare the list of publication(s) for warning message. */
2021 4 : get_publications_str(publist, pubnames, false);
2022 4 : ereport(WARNING,
2023 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2024 : errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2025 : subname),
2026 : errdetail_plural("Subscribed publication %s is subscribing to other publications.",
2027 : "Subscribed publications %s are subscribing to other publications.",
2028 : list_length(publist), pubnames->data),
2029 : errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2030 : }
2031 :
2032 12 : ExecDropSingleTupleTableSlot(slot);
2033 :
2034 12 : walrcv_clear_result(res);
2035 : }
2036 :
2037 : /*
2038 : * Get the list of tables which belong to specified publications on the
2039 : * publisher connection.
2040 : *
2041 : * Note that we don't support the case where the column list is different for
2042 : * the same table in different publications to avoid sending unwanted column
2043 : * information for some of the rows. This can happen when both the column
2044 : * list and row filter are specified for different publications.
2045 : */
2046 : static List *
2047 220 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
2048 : {
2049 : WalRcvExecResult *res;
2050 : StringInfoData cmd;
2051 : TupleTableSlot *slot;
2052 220 : Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
2053 220 : List *tablelist = NIL;
2054 220 : int server_version = walrcv_server_version(wrconn);
2055 220 : bool check_columnlist = (server_version >= 150000);
2056 :
2057 220 : initStringInfo(&cmd);
2058 :
2059 : /* Get the list of tables from the publisher. */
2060 220 : if (server_version >= 160000)
2061 : {
2062 : StringInfoData pub_names;
2063 :
2064 220 : tableRow[2] = INT2VECTOROID;
2065 220 : initStringInfo(&pub_names);
2066 220 : get_publications_str(publications, &pub_names, true);
2067 :
2068 : /*
2069 : * From version 16, we allowed passing multiple publications to the
2070 : * function pg_get_publication_tables. This helped to filter out the
2071 : * partition table whose ancestor is also published in this
2072 : * publication array.
2073 : *
2074 : * Join pg_get_publication_tables with pg_publication to exclude
2075 : * non-existing publications.
2076 : *
2077 : * Note that attrs are always stored in sorted order so we don't need
2078 : * to worry if different publications have specified them in a
2079 : * different order. See publication_translate_columns.
2080 : */
2081 220 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
2082 : " FROM pg_class c\n"
2083 : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2084 : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2085 : " FROM pg_publication\n"
2086 : " WHERE pubname IN ( %s )) AS gpt\n"
2087 : " ON gpt.relid = c.oid\n",
2088 : pub_names.data);
2089 :
2090 220 : pfree(pub_names.data);
2091 : }
2092 : else
2093 : {
2094 0 : tableRow[2] = NAMEARRAYOID;
2095 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
2096 :
2097 : /* Get column lists for each relation if the publisher supports it */
2098 0 : if (check_columnlist)
2099 0 : appendStringInfoString(&cmd, ", t.attnames\n");
2100 :
2101 0 : appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2102 : " WHERE t.pubname IN (");
2103 0 : get_publications_str(publications, &cmd, true);
2104 0 : appendStringInfoChar(&cmd, ')');
2105 : }
2106 :
2107 220 : res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
2108 220 : pfree(cmd.data);
2109 :
2110 220 : if (res->status != WALRCV_OK_TUPLES)
2111 0 : ereport(ERROR,
2112 : (errcode(ERRCODE_CONNECTION_FAILURE),
2113 : errmsg("could not receive list of replicated tables from the publisher: %s",
2114 : res->err)));
2115 :
2116 : /* Process tables. */
2117 220 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2118 664 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2119 : {
2120 : char *nspname;
2121 : char *relname;
2122 : bool isnull;
2123 : RangeVar *rv;
2124 :
2125 446 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2126 : Assert(!isnull);
2127 446 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2128 : Assert(!isnull);
2129 :
2130 446 : rv = makeRangeVar(nspname, relname, -1);
2131 :
2132 446 : if (check_columnlist && list_member(tablelist, rv))
2133 2 : ereport(ERROR,
2134 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2135 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2136 : nspname, relname));
2137 : else
2138 444 : tablelist = lappend(tablelist, rv);
2139 :
2140 444 : ExecClearTuple(slot);
2141 : }
2142 218 : ExecDropSingleTupleTableSlot(slot);
2143 :
2144 218 : walrcv_clear_result(res);
2145 :
2146 218 : return tablelist;
2147 : }
2148 :
2149 : /*
2150 : * This is to report the connection failure while dropping replication slots.
2151 : * Here, we report the WARNING for all tablesync slots so that user can drop
2152 : * them manually, if required.
2153 : */
2154 : static void
2155 0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
2156 : {
2157 : ListCell *lc;
2158 :
2159 0 : foreach(lc, rstates)
2160 : {
2161 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2162 0 : Oid relid = rstate->relid;
2163 :
2164 : /* Only cleanup resources of tablesync workers */
2165 0 : if (!OidIsValid(relid))
2166 0 : continue;
2167 :
2168 : /*
2169 : * Caller needs to ensure that relstate doesn't change underneath us.
2170 : * See DropSubscription where we get the relstates.
2171 : */
2172 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2173 : {
2174 0 : char syncslotname[NAMEDATALEN] = {0};
2175 :
2176 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2177 : sizeof(syncslotname));
2178 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
2179 : syncslotname);
2180 : }
2181 : }
2182 :
2183 0 : ereport(ERROR,
2184 : (errcode(ERRCODE_CONNECTION_FAILURE),
2185 : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2186 : slotname, err),
2187 : /* translator: %s is an SQL ALTER command */
2188 : errhint("Use %s to disassociate the subscription from the slot.",
2189 : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2190 : }
2191 :
2192 : /*
2193 : * Check for duplicates in the given list of publications and error out if
2194 : * found one. Add publications to datums as text datums, if datums is not
2195 : * NULL.
2196 : */
2197 : static void
2198 350 : check_duplicates_in_publist(List *publist, Datum *datums)
2199 : {
2200 : ListCell *cell;
2201 350 : int j = 0;
2202 :
2203 822 : foreach(cell, publist)
2204 : {
2205 490 : char *name = strVal(lfirst(cell));
2206 : ListCell *pcell;
2207 :
2208 762 : foreach(pcell, publist)
2209 : {
2210 762 : char *pname = strVal(lfirst(pcell));
2211 :
2212 762 : if (pcell == cell)
2213 472 : break;
2214 :
2215 290 : if (strcmp(name, pname) == 0)
2216 18 : ereport(ERROR,
2217 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2218 : errmsg("publication name \"%s\" used more than once",
2219 : pname)));
2220 : }
2221 :
2222 472 : if (datums)
2223 386 : datums[j++] = CStringGetTextDatum(name);
2224 : }
2225 332 : }
2226 :
2227 : /*
2228 : * Merge current subscription's publications and user-specified publications
2229 : * from ADD/DROP PUBLICATIONS.
2230 : *
2231 : * If addpub is true, we will add the list of publications into oldpublist.
2232 : * Otherwise, we will delete the list of publications from oldpublist. The
2233 : * returned list is a copy, oldpublist itself is not changed.
2234 : *
2235 : * subname is the subscription name, for error messages.
2236 : */
2237 : static List *
2238 54 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
2239 : {
2240 : ListCell *lc;
2241 :
2242 54 : oldpublist = list_copy(oldpublist);
2243 :
2244 54 : check_duplicates_in_publist(newpublist, NULL);
2245 :
2246 92 : foreach(lc, newpublist)
2247 : {
2248 68 : char *name = strVal(lfirst(lc));
2249 : ListCell *lc2;
2250 68 : bool found = false;
2251 :
2252 134 : foreach(lc2, oldpublist)
2253 : {
2254 110 : char *pubname = strVal(lfirst(lc2));
2255 :
2256 110 : if (strcmp(name, pubname) == 0)
2257 : {
2258 44 : found = true;
2259 44 : if (addpub)
2260 12 : ereport(ERROR,
2261 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2262 : errmsg("publication \"%s\" is already in subscription \"%s\"",
2263 : name, subname)));
2264 : else
2265 32 : oldpublist = foreach_delete_current(oldpublist, lc2);
2266 :
2267 32 : break;
2268 : }
2269 : }
2270 :
2271 56 : if (addpub && !found)
2272 18 : oldpublist = lappend(oldpublist, makeString(name));
2273 38 : else if (!addpub && !found)
2274 6 : ereport(ERROR,
2275 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2276 : errmsg("publication \"%s\" is not in subscription \"%s\"",
2277 : name, subname)));
2278 : }
2279 :
2280 : /*
2281 : * XXX Probably no strong reason for this, but for now it's to make ALTER
2282 : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
2283 : */
2284 24 : if (!oldpublist)
2285 6 : ereport(ERROR,
2286 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2287 : errmsg("cannot drop all the publications from a subscription")));
2288 :
2289 18 : return oldpublist;
2290 : }
2291 :
2292 : /*
2293 : * Extract the streaming mode value from a DefElem. This is like
2294 : * defGetBoolean() but also accepts the special value of "parallel".
2295 : */
2296 : char
2297 132 : defGetStreamingMode(DefElem *def)
2298 : {
2299 : /*
2300 : * If no parameter value given, assume "true" is meant.
2301 : */
2302 132 : if (!def->arg)
2303 0 : return LOGICALREP_STREAM_ON;
2304 :
2305 : /*
2306 : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
2307 : */
2308 132 : switch (nodeTag(def->arg))
2309 : {
2310 0 : case T_Integer:
2311 0 : switch (intVal(def->arg))
2312 : {
2313 0 : case 0:
2314 0 : return LOGICALREP_STREAM_OFF;
2315 0 : case 1:
2316 0 : return LOGICALREP_STREAM_ON;
2317 0 : default:
2318 : /* otherwise, error out below */
2319 0 : break;
2320 : }
2321 0 : break;
2322 132 : default:
2323 : {
2324 132 : char *sval = defGetString(def);
2325 :
2326 : /*
2327 : * The set of strings accepted here should match up with the
2328 : * grammar's opt_boolean_or_string production.
2329 : */
2330 258 : if (pg_strcasecmp(sval, "false") == 0 ||
2331 126 : pg_strcasecmp(sval, "off") == 0)
2332 6 : return LOGICALREP_STREAM_OFF;
2333 234 : if (pg_strcasecmp(sval, "true") == 0 ||
2334 108 : pg_strcasecmp(sval, "on") == 0)
2335 90 : return LOGICALREP_STREAM_ON;
2336 36 : if (pg_strcasecmp(sval, "parallel") == 0)
2337 30 : return LOGICALREP_STREAM_PARALLEL;
2338 : }
2339 6 : break;
2340 : }
2341 :
2342 6 : ereport(ERROR,
2343 : (errcode(ERRCODE_SYNTAX_ERROR),
2344 : errmsg("%s requires a Boolean value or \"parallel\"",
2345 : def->defname)));
2346 : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
2347 : }
|