Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * subscriptioncmds.c
4 : : * subscription catalog manipulation functions
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/commands/subscriptioncmds.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/commit_ts.h"
18 : : #include "access/htup_details.h"
19 : : #include "access/table.h"
20 : : #include "access/twophase.h"
21 : : #include "access/xact.h"
22 : : #include "catalog/catalog.h"
23 : : #include "catalog/dependency.h"
24 : : #include "catalog/indexing.h"
25 : : #include "catalog/namespace.h"
26 : : #include "catalog/objectaccess.h"
27 : : #include "catalog/objectaddress.h"
28 : : #include "catalog/pg_authid_d.h"
29 : : #include "catalog/pg_database_d.h"
30 : : #include "catalog/pg_foreign_server.h"
31 : : #include "catalog/pg_subscription.h"
32 : : #include "catalog/pg_subscription_rel.h"
33 : : #include "catalog/pg_type.h"
34 : : #include "catalog/pg_user_mapping.h"
35 : : #include "commands/defrem.h"
36 : : #include "commands/event_trigger.h"
37 : : #include "commands/subscriptioncmds.h"
38 : : #include "executor/executor.h"
39 : : #include "foreign/foreign.h"
40 : : #include "miscadmin.h"
41 : : #include "nodes/makefuncs.h"
42 : : #include "pgstat.h"
43 : : #include "replication/logicallauncher.h"
44 : : #include "replication/logicalworker.h"
45 : : #include "replication/origin.h"
46 : : #include "replication/slot.h"
47 : : #include "replication/walreceiver.h"
48 : : #include "replication/walsender.h"
49 : : #include "replication/worker_internal.h"
50 : : #include "storage/lmgr.h"
51 : : #include "utils/acl.h"
52 : : #include "utils/builtins.h"
53 : : #include "utils/guc.h"
54 : : #include "utils/lsyscache.h"
55 : : #include "utils/memutils.h"
56 : : #include "utils/pg_lsn.h"
57 : : #include "utils/syscache.h"
58 : :
59 : : /*
60 : : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
61 : : * command.
62 : : */
63 : : #define SUBOPT_CONNECT 0x00000001
64 : : #define SUBOPT_ENABLED 0x00000002
65 : : #define SUBOPT_CREATE_SLOT 0x00000004
66 : : #define SUBOPT_SLOT_NAME 0x00000008
67 : : #define SUBOPT_COPY_DATA 0x00000010
68 : : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
69 : : #define SUBOPT_REFRESH 0x00000040
70 : : #define SUBOPT_BINARY 0x00000080
71 : : #define SUBOPT_STREAMING 0x00000100
72 : : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
73 : : #define SUBOPT_DISABLE_ON_ERR 0x00000400
74 : : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
75 : : #define SUBOPT_RUN_AS_OWNER 0x00001000
76 : : #define SUBOPT_FAILOVER 0x00002000
77 : : #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
78 : : #define SUBOPT_MAX_RETENTION_DURATION 0x00008000
79 : : #define SUBOPT_WAL_RECEIVER_TIMEOUT 0x00010000
80 : : #define SUBOPT_LSN 0x00020000
81 : : #define SUBOPT_ORIGIN 0x00040000
82 : :
83 : : /* check if the 'val' has 'bits' set */
84 : : #define IsSet(val, bits) (((val) & (bits)) == (bits))
85 : :
86 : : /*
87 : : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
88 : : * SUBSCRIPTION command options and the parsed/default values of each of them.
89 : : */
90 : : typedef struct SubOpts
91 : : {
92 : : uint32 specified_opts;
93 : : char *slot_name;
94 : : char *synchronous_commit;
95 : : bool connect;
96 : : bool enabled;
97 : : bool create_slot;
98 : : bool copy_data;
99 : : bool refresh;
100 : : bool binary;
101 : : char streaming;
102 : : bool twophase;
103 : : bool disableonerr;
104 : : bool passwordrequired;
105 : : bool runasowner;
106 : : bool failover;
107 : : bool retaindeadtuples;
108 : : int32 maxretention;
109 : : char *origin;
110 : : XLogRecPtr lsn;
111 : : char *wal_receiver_timeout;
112 : : } SubOpts;
113 : :
114 : : /*
115 : : * PublicationRelKind represents a relation included in a publication.
116 : : * It stores the schema-qualified relation name (rv) and its kind (relkind).
117 : : */
118 : : typedef struct PublicationRelKind
119 : : {
120 : : RangeVar *rv;
121 : : char relkind;
122 : : } PublicationRelKind;
123 : :
124 : : static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
125 : : static void check_publications_origin_tables(WalReceiverConn *wrconn,
126 : : List *publications, bool copydata,
127 : : bool retain_dead_tuples,
128 : : char *origin,
129 : : Oid *subrel_local_oids,
130 : : int subrel_count, char *subname);
131 : : static void check_publications_origin_sequences(WalReceiverConn *wrconn,
132 : : List *publications,
133 : : bool copydata, char *origin,
134 : : Oid *subrel_local_oids,
135 : : int subrel_count,
136 : : char *subname);
137 : : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
138 : : static void check_duplicates_in_publist(List *publist, Datum *datums);
139 : : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
140 : : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
141 : : static void CheckAlterSubOption(Subscription *sub, const char *option,
142 : : bool slot_needs_update, bool isTopLevel);
143 : :
144 : :
145 : : /*
146 : : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
147 : : *
148 : : * Since not all options can be specified in both commands, this function
149 : : * will report an error if mutually exclusive options are specified.
150 : : */
151 : : static void
1811 dean.a.rasheed@gmail 152 :CBC 640 : parse_subscription_options(ParseState *pstate, List *stmt_options,
153 : : uint32 supported_opts, SubOpts *opts)
154 : : {
155 : : ListCell *lc;
156 : :
157 : : /* Start out with cleared opts. */
1665 michael@paquier.xyz 158 : 640 : memset(opts, 0, sizeof(SubOpts));
159 : :
160 : : /* caller must expect some option */
1820 akapila@postgresql.o 161 [ - + ]: 640 : Assert(supported_opts != 0);
162 : :
163 : : /* If connect option is supported, these others also need to be. */
164 [ + + - + ]: 640 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
165 : : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
166 : : SUBOPT_COPY_DATA));
167 : :
168 : : /* Set default values for the supported options. */
169 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_CONNECT))
170 : 312 : opts->connect = true;
171 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_ENABLED))
172 : 374 : opts->enabled = true;
173 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
174 : 312 : opts->create_slot = true;
175 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
176 : 403 : opts->copy_data = true;
177 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_REFRESH))
178 : 54 : opts->refresh = true;
179 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_BINARY))
180 : 472 : opts->binary = false;
181 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_STREAMING))
610 182 : 472 : opts->streaming = LOGICALREP_STREAM_PARALLEL;
1812 183 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
184 : 472 : opts->twophase = false;
1569 185 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
186 : 472 : opts->disableonerr = false;
1188 rhaas@postgresql.org 187 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
188 : 472 : opts->passwordrequired = true;
1183 189 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
190 : 472 : opts->runasowner = false;
882 akapila@postgresql.o 191 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_FAILOVER))
192 : 472 : opts->failover = false;
342 akapila@postgresql.o 193 [ + + ]:GNC 640 : if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
194 : 472 : opts->retaindeadtuples = false;
301 195 [ + + ]: 640 : if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
196 : 472 : opts->maxretention = 0;
1440 akapila@postgresql.o 197 [ + + ]:CBC 640 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
198 : 472 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
199 : :
200 : : /* Parse options */
1820 201 [ + + + + : 1289 : foreach(lc, stmt_options)
+ + ]
202 : : {
3449 peter_e@gmx.net 203 : 705 : DefElem *defel = (DefElem *) lfirst(lc);
204 : :
1820 akapila@postgresql.o 205 [ + + ]: 705 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
206 [ + + ]: 417 : strcmp(defel->defname, "connect") == 0)
207 : : {
208 [ - + ]: 149 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
1811 dean.a.rasheed@gmail 209 :UBC 0 : errorConflictingDefElem(defel, pstate);
210 : :
1820 akapila@postgresql.o 211 :CBC 149 : opts->specified_opts |= SUBOPT_CONNECT;
212 : 149 : opts->connect = defGetBoolean(defel);
213 : : }
214 [ + + ]: 556 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
215 [ + + ]: 330 : strcmp(defel->defname, "enabled") == 0)
216 : : {
217 [ - + ]: 86 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
1811 dean.a.rasheed@gmail 218 :UBC 0 : errorConflictingDefElem(defel, pstate);
219 : :
1820 akapila@postgresql.o 220 :CBC 86 : opts->specified_opts |= SUBOPT_ENABLED;
221 : 86 : opts->enabled = defGetBoolean(defel);
222 : : }
223 [ + + ]: 470 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
224 [ + + ]: 244 : strcmp(defel->defname, "create_slot") == 0)
225 : : {
226 [ - + ]: 25 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
1811 dean.a.rasheed@gmail 227 :UBC 0 : errorConflictingDefElem(defel, pstate);
228 : :
1820 akapila@postgresql.o 229 :CBC 25 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
230 : 25 : opts->create_slot = defGetBoolean(defel);
231 : : }
232 [ + + ]: 445 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
233 [ + + ]: 381 : strcmp(defel->defname, "slot_name") == 0)
234 : : {
235 [ - + ]: 129 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
1811 dean.a.rasheed@gmail 236 :UBC 0 : errorConflictingDefElem(defel, pstate);
237 : :
1820 akapila@postgresql.o 238 :CBC 129 : opts->specified_opts |= SUBOPT_SLOT_NAME;
239 : 129 : opts->slot_name = defGetString(defel);
240 : :
241 : : /* Setting slot_name = NONE is treated as no slot name. */
242 [ + + ]: 254 : if (strcmp(opts->slot_name, "none") == 0)
243 : 101 : opts->slot_name = NULL;
244 : : else
342 akapila@postgresql.o 245 :GNC 28 : ReplicationSlotValidateName(opts->slot_name, false, ERROR);
246 : : }
1820 akapila@postgresql.o 247 [ + + ]:CBC 316 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
248 [ + + ]: 198 : strcmp(defel->defname, "copy_data") == 0)
249 : : {
250 [ - + ]: 31 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
1811 dean.a.rasheed@gmail 251 :UBC 0 : errorConflictingDefElem(defel, pstate);
252 : :
1820 akapila@postgresql.o 253 :CBC 31 : opts->specified_opts |= SUBOPT_COPY_DATA;
254 : 31 : opts->copy_data = defGetBoolean(defel);
255 : : }
256 [ + + ]: 285 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
257 [ + + ]: 226 : strcmp(defel->defname, "synchronous_commit") == 0)
258 : : {
259 [ - + ]: 8 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
1811 dean.a.rasheed@gmail 260 :UBC 0 : errorConflictingDefElem(defel, pstate);
261 : :
1820 akapila@postgresql.o 262 :CBC 8 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
263 : 8 : opts->synchronous_commit = defGetString(defel);
264 : :
265 : : /* Test if the given value is valid for synchronous_commit GUC. */
266 : 8 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
267 : : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
268 : : false, 0, false);
269 : : }
270 [ + + ]: 277 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
271 [ + - ]: 44 : strcmp(defel->defname, "refresh") == 0)
272 : : {
273 [ - + ]: 44 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
1811 dean.a.rasheed@gmail 274 :UBC 0 : errorConflictingDefElem(defel, pstate);
275 : :
1820 akapila@postgresql.o 276 :CBC 44 : opts->specified_opts |= SUBOPT_REFRESH;
277 : 44 : opts->refresh = defGetBoolean(defel);
278 : : }
279 [ + + ]: 233 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
280 [ + + ]: 218 : strcmp(defel->defname, "binary") == 0)
281 : : {
282 [ - + ]: 19 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
1811 dean.a.rasheed@gmail 283 :UBC 0 : errorConflictingDefElem(defel, pstate);
284 : :
1820 akapila@postgresql.o 285 :CBC 19 : opts->specified_opts |= SUBOPT_BINARY;
286 : 19 : opts->binary = defGetBoolean(defel);
287 : : }
288 [ + + ]: 214 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
289 [ + + ]: 199 : strcmp(defel->defname, "streaming") == 0)
290 : : {
291 [ - + ]: 43 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
1811 dean.a.rasheed@gmail 292 :UBC 0 : errorConflictingDefElem(defel, pstate);
293 : :
1820 akapila@postgresql.o 294 :CBC 43 : opts->specified_opts |= SUBOPT_STREAMING;
1268 295 : 43 : opts->streaming = defGetStreamingMode(defel);
296 : : }
706 297 [ + + ]: 171 : else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
298 [ + + ]: 156 : strcmp(defel->defname, "two_phase") == 0)
299 : : {
1812 300 [ - + ]: 24 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
1811 dean.a.rasheed@gmail 301 :UBC 0 : errorConflictingDefElem(defel, pstate);
302 : :
1812 akapila@postgresql.o 303 :CBC 24 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
304 : 24 : opts->twophase = defGetBoolean(defel);
305 : : }
1569 306 [ + + ]: 147 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
307 [ + + ]: 132 : strcmp(defel->defname, "disable_on_error") == 0)
308 : : {
309 [ - + ]: 13 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
1569 akapila@postgresql.o 310 :UBC 0 : errorConflictingDefElem(defel, pstate);
311 : :
1569 akapila@postgresql.o 312 :CBC 13 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
313 : 13 : opts->disableonerr = defGetBoolean(defel);
314 : : }
1188 rhaas@postgresql.org 315 [ + + ]: 134 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
316 [ + + ]: 119 : strcmp(defel->defname, "password_required") == 0)
317 : : {
318 [ - + ]: 16 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
1188 rhaas@postgresql.org 319 :UBC 0 : errorConflictingDefElem(defel, pstate);
320 : :
1188 rhaas@postgresql.org 321 :CBC 16 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
322 : 16 : opts->passwordrequired = defGetBoolean(defel);
323 : : }
1183 324 [ + + ]: 118 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
325 [ + + ]: 103 : strcmp(defel->defname, "run_as_owner") == 0)
326 : : {
327 [ - + ]: 11 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
1183 rhaas@postgresql.org 328 :UBC 0 : errorConflictingDefElem(defel, pstate);
329 : :
1183 rhaas@postgresql.org 330 :CBC 11 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
331 : 11 : opts->runasowner = defGetBoolean(defel);
332 : : }
882 akapila@postgresql.o 333 [ + + ]: 107 : else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
334 [ + + ]: 92 : strcmp(defel->defname, "failover") == 0)
335 : : {
336 [ - + ]: 16 : if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
882 akapila@postgresql.o 337 :UBC 0 : errorConflictingDefElem(defel, pstate);
338 : :
882 akapila@postgresql.o 339 :CBC 16 : opts->specified_opts |= SUBOPT_FAILOVER;
340 : 16 : opts->failover = defGetBoolean(defel);
341 : : }
342 akapila@postgresql.o 342 [ + + ]:GNC 91 : else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
343 [ + + ]: 76 : strcmp(defel->defname, "retain_dead_tuples") == 0)
344 : : {
345 [ - + ]: 14 : if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
342 akapila@postgresql.o 346 :UNC 0 : errorConflictingDefElem(defel, pstate);
347 : :
342 akapila@postgresql.o 348 :GNC 14 : opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
349 : 14 : opts->retaindeadtuples = defGetBoolean(defel);
350 : : }
301 351 [ + + ]: 77 : else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
352 [ + + ]: 62 : strcmp(defel->defname, "max_retention_duration") == 0)
353 : : {
354 [ - + ]: 22 : if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
301 akapila@postgresql.o 355 :UNC 0 : errorConflictingDefElem(defel, pstate);
356 : :
301 akapila@postgresql.o 357 :GNC 22 : opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
358 : 22 : opts->maxretention = defGetInt32(defel);
359 : :
19 360 [ + + ]: 18 : if (opts->maxretention < 0)
361 [ + - ]: 8 : ereport(ERROR,
362 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
363 : : errmsg("max_retention_duration cannot be negative"));
364 : : }
1440 akapila@postgresql.o 365 [ + + ]:CBC 55 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
366 [ + + ]: 40 : strcmp(defel->defname, "origin") == 0)
367 : : {
368 [ - + ]: 24 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
1440 akapila@postgresql.o 369 :UBC 0 : errorConflictingDefElem(defel, pstate);
370 : :
1440 akapila@postgresql.o 371 :CBC 24 : opts->specified_opts |= SUBOPT_ORIGIN;
372 : 24 : pfree(opts->origin);
373 : :
374 : : /*
375 : : * Even though the "origin" parameter allows only "none" and "any"
376 : : * values, it is implemented as a string type so that the
377 : : * parameter can be extended in future versions to support
378 : : * filtering using origin names specified by the user.
379 : : */
380 : 24 : opts->origin = defGetString(defel);
381 : :
382 [ + + + + ]: 34 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
383 : 10 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
384 [ + - ]: 4 : ereport(ERROR,
385 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
386 : : errmsg("unrecognized origin value: \"%s\"", opts->origin));
387 : : }
1561 388 [ + + ]: 31 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
389 [ + - ]: 15 : strcmp(defel->defname, "lsn") == 0)
390 : 11 : {
391 : 15 : char *lsn_str = defGetString(defel);
392 : : XLogRecPtr lsn;
393 : :
394 [ - + ]: 15 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
1561 akapila@postgresql.o 395 :UBC 0 : errorConflictingDefElem(defel, pstate);
396 : :
397 : : /* Setting lsn = NONE is treated as resetting LSN */
1561 akapila@postgresql.o 398 [ + + ]:CBC 15 : if (strcmp(lsn_str, "none") == 0)
399 : 4 : lsn = InvalidXLogRecPtr;
400 : : else
401 : : {
402 : : /* Parse the argument as LSN */
403 : 11 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
404 : : CStringGetDatum(lsn_str)));
405 : :
236 alvherre@kurilemu.de 406 [ + + ]:GNC 11 : if (!XLogRecPtrIsValid(lsn))
1561 akapila@postgresql.o 407 [ + - ]:CBC 4 : ereport(ERROR,
408 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
409 : : errmsg("invalid WAL location (LSN): %s", lsn_str)));
410 : : }
411 : :
412 : 11 : opts->specified_opts |= SUBOPT_LSN;
413 : 11 : opts->lsn = lsn;
414 : : }
130 fujii@postgresql.org 415 [ + - ]:GNC 16 : else if (IsSet(supported_opts, SUBOPT_WAL_RECEIVER_TIMEOUT) &&
416 [ + + ]: 16 : strcmp(defel->defname, "wal_receiver_timeout") == 0)
417 : 8 : {
418 : : bool parsed;
419 : : int val;
420 : :
421 [ - + ]: 12 : if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
130 fujii@postgresql.org 422 :UNC 0 : errorConflictingDefElem(defel, pstate);
423 : :
130 fujii@postgresql.org 424 :GNC 12 : opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
425 : 12 : opts->wal_receiver_timeout = defGetString(defel);
426 : :
427 : : /*
428 : : * Test if the given value is valid for wal_receiver_timeout GUC.
429 : : * Skip this test if the value is -1, since -1 is allowed for the
430 : : * wal_receiver_timeout subscription option, but not for the GUC
431 : : * itself.
432 : : */
433 : 12 : parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
434 [ + + - + ]: 12 : if (!parsed || val != -1)
435 : 8 : (void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
436 : : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
437 : : false, 0, false);
438 : : }
439 : : else
3331 peter_e@gmx.net 440 [ + - ]:CBC 4 : ereport(ERROR,
441 : : (errcode(ERRCODE_SYNTAX_ERROR),
442 : : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
443 : : }
444 : :
445 : : /*
446 : : * We've been explicitly asked to not connect, that requires some
447 : : * additional processing.
448 : : */
1820 akapila@postgresql.o 449 [ + + + + ]: 584 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
450 : : {
451 : : /* Check for incompatible options from the user. */
452 [ + - ]: 117 : if (opts->enabled &&
453 [ + + ]: 117 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
3386 peter_e@gmx.net 454 [ + - ]: 4 : ereport(ERROR,
455 : : (errcode(ERRCODE_SYNTAX_ERROR),
456 : : /*- translator: both %s are strings of the form "option = value" */
457 : : errmsg("%s and %s are mutually exclusive options",
458 : : "connect = false", "enabled = true")));
459 : :
1820 akapila@postgresql.o 460 [ + + ]: 113 : if (opts->create_slot &&
461 [ + + ]: 109 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
3386 peter_e@gmx.net 462 [ + - ]: 4 : ereport(ERROR,
463 : : (errcode(ERRCODE_SYNTAX_ERROR),
464 : : errmsg("%s and %s are mutually exclusive options",
465 : : "connect = false", "create_slot = true")));
466 : :
1820 akapila@postgresql.o 467 [ + + ]: 109 : if (opts->copy_data &&
468 [ + + ]: 105 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
3386 peter_e@gmx.net 469 [ + - ]: 4 : ereport(ERROR,
470 : : (errcode(ERRCODE_SYNTAX_ERROR),
471 : : errmsg("%s and %s are mutually exclusive options",
472 : : "connect = false", "copy_data = true")));
473 : :
474 : : /* Change the defaults of other options. */
1820 akapila@postgresql.o 475 : 105 : opts->enabled = false;
476 : 105 : opts->create_slot = false;
477 : 105 : opts->copy_data = false;
478 : : }
479 : :
480 : : /*
481 : : * Do additional checking for disallowed combination when slot_name = NONE
482 : : * was used.
483 : : */
484 [ + + ]: 572 : if (!opts->slot_name &&
485 [ + + ]: 548 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
486 : : {
1665 michael@paquier.xyz 487 [ + + ]: 97 : if (opts->enabled)
488 : : {
489 [ + + ]: 12 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
490 [ + - ]: 4 : ereport(ERROR,
491 : : (errcode(ERRCODE_SYNTAX_ERROR),
492 : : /*- translator: both %s are strings of the form "option = value" */
493 : : errmsg("%s and %s are mutually exclusive options",
494 : : "slot_name = NONE", "enabled = true")));
495 : : else
496 [ + - ]: 8 : ereport(ERROR,
497 : : (errcode(ERRCODE_SYNTAX_ERROR),
498 : : /*- translator: both %s are strings of the form "option = value" */
499 : : errmsg("subscription with %s must also set %s",
500 : : "slot_name = NONE", "enabled = false")));
501 : : }
502 : :
503 [ + + ]: 85 : if (opts->create_slot)
504 : : {
505 [ + + ]: 8 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
506 [ + - ]: 4 : ereport(ERROR,
507 : : (errcode(ERRCODE_SYNTAX_ERROR),
508 : : /*- translator: both %s are strings of the form "option = value" */
509 : : errmsg("%s and %s are mutually exclusive options",
510 : : "slot_name = NONE", "create_slot = true")));
511 : : else
512 [ + - ]: 4 : ereport(ERROR,
513 : : (errcode(ERRCODE_SYNTAX_ERROR),
514 : : /*- translator: both %s are strings of the form "option = value" */
515 : : errmsg("subscription with %s must also set %s",
516 : : "slot_name = NONE", "create_slot = false")));
517 : : }
518 : : }
3449 peter_e@gmx.net 519 : 552 : }
520 : :
521 : : /*
522 : : * Append a suitably-quoted identifier or string literal to buf.
523 : : * "quote" should be either a double-quote or single-quote character.
524 : : *
525 : : * Caution: this quoting logic is sufficient for identifiers and literals
526 : : * in the replication grammar, but not always in regular SQL. Specifically,
527 : : * it'd fail for a string literal if standard_conforming_strings is off.
528 : : */
529 : : static void
15 tgl@sss.pgh.pa.us 530 : 288 : appendQuotedString(StringInfo buf, const char *str, char quote)
531 : : {
532 : 288 : appendStringInfoChar(buf, quote);
533 [ + + ]: 9180 : while (*str)
534 : : {
535 : 8892 : char c = *str++;
536 : :
537 [ - + ]: 8892 : if (c == quote)
15 tgl@sss.pgh.pa.us 538 :UBC 0 : appendStringInfoChar(buf, c);
15 tgl@sss.pgh.pa.us 539 :CBC 8892 : appendStringInfoChar(buf, c);
540 : : }
541 : 288 : appendStringInfoChar(buf, quote);
542 : 288 : }
543 : :
544 : : #define appendQuotedIdentifier(b, s) appendQuotedString(b, s, '"')
545 : : #define appendQuotedLiteral(b, s) appendQuotedString(b, s, '\'')
546 : :
547 : : /*
548 : : * Check that the specified publications are present on the publisher.
549 : : */
550 : : static void
1552 akapila@postgresql.o 551 : 135 : check_publications(WalReceiverConn *wrconn, List *publications)
552 : : {
553 : : WalRcvExecResult *res;
554 : : StringInfoData cmd;
555 : : TupleTableSlot *slot;
556 : 135 : List *publicationsCopy = NIL;
557 : 135 : Oid tableRow[1] = {TEXTOID};
558 : :
236 drowley@postgresql.o 559 :GNC 135 : initStringInfo(&cmd);
560 : 135 : appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
561 : : " pg_catalog.pg_publication t WHERE\n"
562 : : " t.pubname IN (");
563 : 135 : GetPublicationsStr(publications, &cmd, true);
564 : 135 : appendStringInfoChar(&cmd, ')');
565 : :
566 : 135 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
567 : 135 : pfree(cmd.data);
568 : :
1552 akapila@postgresql.o 569 [ - + ]:CBC 135 : if (res->status != WALRCV_OK_TUPLES)
1552 akapila@postgresql.o 570 [ # # ]:UBC 0 : ereport(ERROR,
571 : : errmsg("could not receive list of publications from the publisher: %s",
572 : : res->err));
573 : :
1552 akapila@postgresql.o 574 :CBC 135 : publicationsCopy = list_copy(publications);
575 : :
576 : : /* Process publication(s). */
577 : 135 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
578 [ + + ]: 299 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
579 : : {
580 : : char *pubname;
581 : : bool isnull;
582 : :
583 : 164 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
584 [ - + ]: 164 : Assert(!isnull);
585 : :
586 : : /* Delete the publication present in publisher from the list. */
587 : 164 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
588 : 164 : ExecClearTuple(slot);
589 : : }
590 : :
591 : 135 : ExecDropSingleTupleTableSlot(slot);
592 : :
593 : 135 : walrcv_clear_result(res);
594 : :
595 [ + + ]: 135 : if (list_length(publicationsCopy))
596 : : {
597 : : /* Prepare the list of non-existent publication(s) for error message. */
598 : : StringInfoData pubnames;
599 : :
236 drowley@postgresql.o 600 :GNC 4 : initStringInfo(&pubnames);
601 : :
602 : 4 : GetPublicationsStr(publicationsCopy, &pubnames, false);
1552 akapila@postgresql.o 603 [ + - ]:CBC 4 : ereport(WARNING,
604 : : errcode(ERRCODE_UNDEFINED_OBJECT),
605 : : errmsg_plural("publication %s does not exist on the publisher",
606 : : "publications %s do not exist on the publisher",
607 : : list_length(publicationsCopy),
608 : : pubnames.data));
609 : : }
610 : 135 : }
611 : :
612 : : /*
613 : : * Auxiliary function to build a text array out of a list of String nodes.
614 : : */
615 : : static Datum
3449 peter_e@gmx.net 616 : 233 : publicationListToArray(List *publist)
617 : : {
618 : : ArrayType *arr;
619 : : Datum *datums;
620 : : MemoryContext memcxt;
621 : : MemoryContext oldcxt;
622 : :
623 : : /* Create memory context for temporary allocations. */
624 : 233 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
625 : : "publicationListToArray to array",
626 : : ALLOCSET_DEFAULT_SIZES);
627 : 233 : oldcxt = MemoryContextSwitchTo(memcxt);
628 : :
202 michael@paquier.xyz 629 :GNC 233 : datums = palloc_array(Datum, list_length(publist));
630 : :
1911 peter@eisentraut.org 631 :CBC 233 : check_duplicates_in_publist(publist, datums);
632 : :
3449 peter_e@gmx.net 633 : 229 : MemoryContextSwitchTo(oldcxt);
634 : :
1460 peter@eisentraut.org 635 : 229 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
636 : :
3449 peter_e@gmx.net 637 : 229 : MemoryContextDelete(memcxt);
638 : :
639 : 229 : return PointerGetDatum(arr);
640 : : }
641 : :
642 : : /*
643 : : * Create new subscription.
644 : : */
645 : : ObjectAddress
1811 dean.a.rasheed@gmail 646 : 312 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
647 : : bool isTopLevel)
648 : : {
649 : : Relation rel;
650 : : ObjectAddress myself;
651 : : Oid subid;
652 : : bool nulls[Natts_pg_subscription];
653 : : Datum values[Natts_pg_subscription];
3448 alvherre@alvh.no-ip. 654 : 312 : Oid owner = GetUserId();
655 : : HeapTuple tup;
656 : : Oid serverid;
657 : : char *conninfo;
658 : : char originname[NAMEDATALEN];
659 : : List *publications;
660 : : uint32 supported_opts;
1820 akapila@postgresql.o 661 : 312 : SubOpts opts = {0};
662 : : AclResult aclresult;
663 : :
664 : : /*
665 : : * Parse and check options.
666 : : *
667 : : * Connection and publication should not be specified here.
668 : : */
669 : 312 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
670 : : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
671 : : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
672 : : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
673 : : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
674 : : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
675 : : SUBOPT_RETAIN_DEAD_TUPLES |
676 : : SUBOPT_MAX_RETENTION_DURATION |
677 : : SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN);
1811 dean.a.rasheed@gmail 678 : 312 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
679 : :
680 : : /*
681 : : * Since creating a replication slot is not transactional, rolling back
682 : : * the transaction leaves the created replication slot. So we cannot run
683 : : * CREATE SUBSCRIPTION inside a transaction block if creating a
684 : : * replication slot.
685 : : */
1820 akapila@postgresql.o 686 [ + + ]: 248 : if (opts.create_slot)
3056 peter_e@gmx.net 687 : 138 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
688 : :
689 : : /*
690 : : * We don't want to allow unprivileged users to be able to trigger
691 : : * attempts to access arbitrary network destinations, so require the user
692 : : * to have been specifically authorized to create subscriptions.
693 : : */
1188 rhaas@postgresql.org 694 [ + + ]: 244 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
3449 peter_e@gmx.net 695 [ + - ]: 4 : ereport(ERROR,
696 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
697 : : errmsg("permission denied to create subscription"),
698 : : errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
699 : : "pg_create_subscription")));
700 : :
701 : : /*
702 : : * Since a subscription is a database object, we also check for CREATE
703 : : * permission on the database.
704 : : */
1188 rhaas@postgresql.org 705 : 240 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
706 : : owner, ACL_CREATE);
707 [ + + ]: 240 : if (aclresult != ACLCHECK_OK)
708 : 8 : aclcheck_error(aclresult, OBJECT_DATABASE,
709 : 4 : get_database_name(MyDatabaseId));
710 : :
711 : : /*
712 : : * Non-superusers are required to set a password for authentication, and
713 : : * that password must be used by the target server, but the superuser can
714 : : * exempt a subscription from this requirement.
715 : : */
716 [ + + + + ]: 236 : if (!opts.passwordrequired && !superuser_arg(owner))
1138 tgl@sss.pgh.pa.us 717 [ + - ]: 4 : ereport(ERROR,
718 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
719 : : errmsg("password_required=false is superuser-only"),
720 : : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
721 : :
722 : : /*
723 : : * If built with appropriate switch, whine when regression-testing
724 : : * conventions for subscription names are violated.
725 : : */
726 : : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
727 : : if (strncmp(stmt->subname, "regress_", 8) != 0)
728 : : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
729 : : #endif
730 : :
2717 andres@anarazel.de 731 : 232 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
732 : :
733 : : /* Check if name is used */
2779 734 : 232 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
735 : : ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
3449 peter_e@gmx.net 736 [ + + ]: 232 : if (OidIsValid(subid))
737 : : {
738 [ + - ]: 5 : ereport(ERROR,
739 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
740 : : errmsg("subscription \"%s\" already exists",
741 : : stmt->subname)));
742 : : }
743 : :
744 : : /*
745 : : * Ensure that system configuration parameters are set appropriately to
746 : : * support retain_dead_tuples and max_retention_duration.
747 : : */
301 akapila@postgresql.o 748 :GNC 227 : CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
749 : 227 : opts.retaindeadtuples, opts.retaindeadtuples,
750 : 227 : (opts.maxretention > 0));
751 : :
1820 akapila@postgresql.o 752 [ + + ]:CBC 227 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
753 [ + - ]: 181 : opts.slot_name == NULL)
754 : 181 : opts.slot_name = stmt->subname;
755 : :
756 : : /* The default for synchronous_commit of subscriptions is off. */
757 [ + - ]: 227 : if (opts.synchronous_commit == NULL)
758 : 227 : opts.synchronous_commit = "off";
759 : :
760 : : /*
761 : : * The default for wal_receiver_timeout of subscriptions is -1, which
762 : : * means the value is inherited from the server configuration, command
763 : : * line, or role/database settings.
764 : : */
130 fujii@postgresql.org 765 [ + - ]:GNC 227 : if (opts.wal_receiver_timeout == NULL)
766 : 227 : opts.wal_receiver_timeout = "-1";
767 : :
768 : : /* Load the library providing us libpq calls. */
3449 peter_e@gmx.net 769 :CBC 227 : load_file("libpqwalreceiver", false);
770 : :
116 jdavis@postgresql.or 771 [ + + ]:GNC 227 : if (stmt->servername)
772 : : {
773 : : ForeignServer *server;
774 : :
775 [ - + ]: 22 : Assert(!stmt->conninfo);
776 : 22 : conninfo = NULL;
777 : :
778 : 22 : server = GetForeignServerByName(stmt->servername, false);
779 : 22 : aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
780 [ + + ]: 22 : if (aclresult != ACLCHECK_OK)
781 : 4 : aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
782 : :
783 : : /* make sure a user mapping exists */
784 : 18 : GetUserMapping(owner, server->serverid);
785 : :
786 : 14 : serverid = server->serverid;
98 787 : 14 : conninfo = ForeignServerConnectionString(owner, server);
788 : : }
789 : : else
790 : : {
116 791 [ - + ]: 205 : Assert(stmt->conninfo);
792 : :
793 : 205 : serverid = InvalidOid;
794 : 205 : conninfo = stmt->conninfo;
795 : : }
796 : :
797 : : /* Check the connection info string. */
1188 rhaas@postgresql.org 798 [ + + + + ]:CBC 215 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
799 : :
116 jdavis@postgresql.or 800 :GNC 203 : publications = stmt->publication;
801 : :
802 : : /* Everything ok, form a new tuple. */
3449 peter_e@gmx.net 803 :CBC 203 : memset(values, 0, sizeof(values));
804 : 203 : memset(nulls, false, sizeof(nulls));
805 : :
2779 andres@anarazel.de 806 : 203 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
807 : : Anum_pg_subscription_oid);
808 : 203 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
3449 peter_e@gmx.net 809 : 203 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
1545 akapila@postgresql.o 810 : 203 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
3449 peter_e@gmx.net 811 : 203 : values[Anum_pg_subscription_subname - 1] =
812 : 203 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
3448 alvherre@alvh.no-ip. 813 : 203 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
1820 akapila@postgresql.o 814 : 203 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
815 : 203 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
1268 816 : 203 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
1812 817 : 203 : values[Anum_pg_subscription_subtwophasestate - 1] =
818 [ + + ]: 203 : CharGetDatum(opts.twophase ?
819 : : LOGICALREP_TWOPHASE_STATE_PENDING :
820 : : LOGICALREP_TWOPHASE_STATE_DISABLED);
1569 821 : 203 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
1188 rhaas@postgresql.org 822 : 203 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
1183 823 : 203 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
882 akapila@postgresql.o 824 : 203 : values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
342 akapila@postgresql.o 825 :GNC 203 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
826 : 203 : BoolGetDatum(opts.retaindeadtuples);
301 827 : 203 : values[Anum_pg_subscription_submaxretention - 1] =
828 : 203 : Int32GetDatum(opts.maxretention);
829 : 203 : values[Anum_pg_subscription_subretentionactive - 1] =
63 830 : 203 : BoolGetDatum(opts.retaindeadtuples);
71 peter@eisentraut.org 831 : 203 : values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(serverid);
116 jdavis@postgresql.or 832 [ + + ]: 203 : if (!OidIsValid(serverid))
833 : 193 : values[Anum_pg_subscription_subconninfo - 1] =
834 : 193 : CStringGetTextDatum(conninfo);
835 : : else
836 : 10 : nulls[Anum_pg_subscription_subconninfo - 1] = true;
1820 akapila@postgresql.o 837 [ + + ]:CBC 203 : if (opts.slot_name)
3339 peter_e@gmx.net 838 : 189 : values[Anum_pg_subscription_subslotname - 1] =
1820 akapila@postgresql.o 839 : 189 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
840 : : else
3339 peter_e@gmx.net 841 : 14 : nulls[Anum_pg_subscription_subslotname - 1] = true;
3364 842 : 203 : values[Anum_pg_subscription_subsynccommit - 1] =
1820 akapila@postgresql.o 843 : 203 : CStringGetTextDatum(opts.synchronous_commit);
130 fujii@postgresql.org 844 :GNC 203 : values[Anum_pg_subscription_subwalrcvtimeout - 1] =
845 : 203 : CStringGetTextDatum(opts.wal_receiver_timeout);
3449 peter_e@gmx.net 846 :CBC 199 : values[Anum_pg_subscription_subpublications - 1] =
3331 bruce@momjian.us 847 : 203 : publicationListToArray(publications);
1440 akapila@postgresql.o 848 : 199 : values[Anum_pg_subscription_suborigin - 1] =
849 : 199 : CStringGetTextDatum(opts.origin);
850 : :
3449 peter_e@gmx.net 851 : 199 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
852 : :
853 : : /* Insert tuple into catalog. */
2779 andres@anarazel.de 854 : 199 : CatalogTupleInsert(rel, tup);
3449 peter_e@gmx.net 855 : 199 : heap_freetuple(tup);
856 : :
3448 alvherre@alvh.no-ip. 857 : 199 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
858 : :
116 jdavis@postgresql.or 859 :GNC 199 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
860 : :
861 [ + + ]: 199 : if (stmt->servername)
862 : : {
863 : : ObjectAddress referenced;
864 : :
865 [ - + ]: 10 : Assert(OidIsValid(serverid));
866 : :
867 : 10 : ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
868 : 10 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
869 : : }
870 : :
871 : : /*
872 : : * A replication origin is currently created for all subscriptions,
873 : : * including those that only contain sequences or are otherwise empty.
874 : : *
875 : : * XXX: While this is technically unnecessary, optimizing it would require
876 : : * additional logic to skip origin creation during DDL operations and
877 : : * apply workers initialization, and to handle origin creation dynamically
878 : : * when tables are added to the subscription. It is not clear whether
879 : : * preventing creation of origins is worth additional complexity.
880 : : */
1358 akapila@postgresql.o 881 :CBC 199 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
3449 peter_e@gmx.net 882 : 199 : replorigin_create(originname);
883 : :
884 : : /*
885 : : * Connect to remote side to execute requested commands and fetch table
886 : : * and sequence info.
887 : : */
1820 akapila@postgresql.o 888 [ + + ]: 199 : if (opts.connect)
889 : : {
890 : : char *err;
891 : : WalReceiverConn *wrconn;
892 : : bool must_use_password;
893 : :
894 : : /* Try to connect to the publisher. */
1188 rhaas@postgresql.org 895 [ - + - - ]: 130 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
876 akapila@postgresql.o 896 : 130 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
897 : : stmt->subname, &err);
3449 peter_e@gmx.net 898 [ + + ]: 130 : if (!wrconn)
899 [ + - ]: 4 : ereport(ERROR,
900 : : (errcode(ERRCODE_CONNECTION_FAILURE),
901 : : errmsg("subscription \"%s\" could not connect to the publisher: %s",
902 : : stmt->subname, err)));
903 : :
3443 904 [ + + ]: 126 : PG_TRY();
905 : : {
250 akapila@postgresql.o 906 :GNC 126 : bool has_tables = false;
907 : : List *pubrels;
908 : : char relation_state;
909 : :
1552 akapila@postgresql.o 910 :CBC 126 : check_publications(wrconn, publications);
250 akapila@postgresql.o 911 :GNC 126 : check_publications_origin_tables(wrconn, publications,
912 : 126 : opts.copy_data,
913 : 126 : opts.retaindeadtuples, opts.origin,
914 : : NULL, 0, stmt->subname);
915 : 126 : check_publications_origin_sequences(wrconn, publications,
916 : 126 : opts.copy_data, opts.origin,
917 : : NULL, 0, stmt->subname);
918 : :
342 919 [ + + ]: 126 : if (opts.retaindeadtuples)
920 : 3 : check_pub_dead_tuple_retention(wrconn);
921 : :
922 : : /*
923 : : * Set sync state based on if we were asked to do data copy or
924 : : * not.
925 : : */
250 926 [ + + ]: 126 : relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
927 : :
928 : : /*
929 : : * Build local relation status info. Relations are for both tables
930 : : * and sequences from the publisher.
931 : : */
932 : 126 : pubrels = fetch_relation_list(wrconn, publications);
933 : :
934 [ + + + + : 445 : foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
+ + ]
935 : : {
936 : : Oid relid;
937 : : char relkind;
938 : 195 : RangeVar *rv = pubrelinfo->rv;
939 : :
3385 peter_e@gmx.net 940 :CBC 195 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
250 akapila@postgresql.o 941 :GNC 195 : relkind = get_rel_relkind(relid);
942 : :
943 : : /* Check for supported relkind. */
944 : 195 : CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
3332 peter_e@gmx.net 945 :CBC 195 : rv->schemaname, rv->relname);
250 akapila@postgresql.o 946 :GNC 195 : has_tables |= (relkind != RELKIND_SEQUENCE);
947 : 195 : AddSubscriptionRelState(subid, relid, relation_state,
948 : : InvalidXLogRecPtr, true);
949 : : }
950 : :
951 : : /*
952 : : * If requested, create permanent slot for the subscription. We
953 : : * won't use the initial snapshot for anything, so no need to
954 : : * export it.
955 : : *
956 : : * XXX: Similar to origins, it is not clear whether preventing the
957 : : * slot creation for empty and sequence-only subscriptions is
958 : : * worth additional complexity.
959 : : */
1820 akapila@postgresql.o 960 [ + + ]:CBC 125 : if (opts.create_slot)
961 : : {
1812 962 : 120 : bool twophase_enabled = false;
963 : :
1820 964 [ - + ]: 120 : Assert(opts.slot_name);
965 : :
966 : : /*
967 : : * Even if two_phase is set, don't create the slot with
968 : : * two-phase enabled. Will enable it once all the tables are
969 : : * synced and ready. This avoids race-conditions like prepared
970 : : * transactions being skipped due to changes not being applied
971 : : * due to checks in should_apply_changes_for_rel() when
972 : : * tablesync for the corresponding tables are in progress. See
973 : : * comments atop worker.c.
974 : : *
975 : : * Note that if tables were specified but copy_data is false
976 : : * then it is safe to enable two_phase up-front because those
977 : : * tables are already initially in READY state. When the
978 : : * subscription has no tables, we leave the twophase state as
979 : : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
980 : : * PUBLICATION to work.
981 : : */
250 akapila@postgresql.o 982 [ + + + + :GNC 120 : if (opts.twophase && !opts.copy_data && has_tables)
+ - ]
1812 akapila@postgresql.o 983 :CBC 1 : twophase_enabled = true;
984 : :
985 : 120 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
986 : : opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
987 : :
988 [ + + ]: 120 : if (twophase_enabled)
989 : 1 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
990 : :
3357 peter_e@gmx.net 991 [ + - ]: 120 : ereport(NOTICE,
992 : : (errmsg("created replication slot \"%s\" on publisher",
993 : : opts.slot_name)));
994 : : }
995 : : }
2433 peter@eisentraut.org 996 : 1 : PG_FINALLY();
997 : : {
3443 peter_e@gmx.net 998 : 126 : walrcv_disconnect(wrconn);
999 : : }
1000 [ + + ]: 126 : PG_END_TRY();
1001 : : }
1002 : : else
3386 1003 [ + - ]: 69 : ereport(WARNING,
1004 : : (errmsg("subscription was created, but is not connected"),
1005 : : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
1006 : :
2717 andres@anarazel.de 1007 : 194 : table_close(rel, RowExclusiveLock);
1008 : :
1546 1009 : 194 : pgstat_create_subscription(subid);
1010 : :
1011 : : /*
1012 : : * Notify the launcher to start the apply worker if the subscription is
1013 : : * enabled, or to create the conflict detection slot if retain_dead_tuples
1014 : : * is enabled.
1015 : : *
1016 : : * Creating the conflict detection slot is essential even when the
1017 : : * subscription is not enabled. This ensures that dead tuples are
1018 : : * retained, which is necessary for accurately identifying the type of
1019 : : * conflict during replication.
1020 : : */
295 akapila@postgresql.o 1021 [ + + + + ]:GNC 194 : if (opts.enabled || opts.retaindeadtuples)
3347 peter_e@gmx.net 1022 :CBC 119 : ApplyLauncherWakeupAtCommit();
1023 : :
3449 1024 [ - + ]: 194 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
1025 : :
1026 : 194 : return myself;
1027 : : }
1028 : :
1029 : : static void
1552 akapila@postgresql.o 1030 : 39 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
1031 : : List *validate_publications)
1032 : : {
1033 : : char *err;
250 akapila@postgresql.o 1034 :GNC 39 : List *pubrels = NIL;
1035 : : Oid *pubrel_local_oids;
1036 : : List *subrel_states;
1037 : 39 : List *sub_remove_rels = NIL;
1038 : : Oid *subrel_local_oids;
1039 : : Oid *subseq_local_oids;
1040 : : int subrel_count;
1041 : : ListCell *lc;
1042 : : int off;
1043 : 39 : int tbl_count = 0;
1044 : 39 : int seq_count = 0;
1964 akapila@postgresql.o 1045 :CBC 39 : Relation rel = NULL;
1046 : : typedef struct SubRemoveRels
1047 : : {
1048 : : Oid relid;
1049 : : char state;
1050 : : } SubRemoveRels;
1051 : :
1052 : : WalReceiverConn *wrconn;
1053 : : bool must_use_password;
1054 : :
1055 : : /* Load the library providing us libpq calls. */
3386 peter_e@gmx.net 1056 : 39 : load_file("libpqwalreceiver", false);
1057 : :
1058 : : /* Try to connect to the publisher. */
987 akapila@postgresql.o 1059 [ + - + + ]: 39 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
876 1060 : 39 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1061 : : sub->name, &err);
1880 alvherre@alvh.no-ip. 1062 [ - + ]: 38 : if (!wrconn)
1880 alvherre@alvh.no-ip. 1063 [ # # ]:UBC 0 : ereport(ERROR,
1064 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1065 : : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1066 : : sub->name, err)));
1067 : :
1964 akapila@postgresql.o 1068 [ + - ]:CBC 38 : PG_TRY();
1069 : : {
1552 1070 [ + + ]: 38 : if (validate_publications)
1071 : 9 : check_publications(wrconn, validate_publications);
1072 : :
1073 : : /* Get the relation list from publisher. */
250 akapila@postgresql.o 1074 :GNC 38 : pubrels = fetch_relation_list(wrconn, sub->publications);
1075 : :
1076 : : /* Get local relation list. */
1077 : 38 : subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
1391 akapila@postgresql.o 1078 :CBC 38 : subrel_count = list_length(subrel_states);
1079 : :
1080 : : /*
1081 : : * Build qsorted arrays of local table oids and sequence oids for
1082 : : * faster lookup. This can potentially contain all tables and
1083 : : * sequences in the database so speed of lookup is important.
1084 : : *
1085 : : * We do not yet know the exact count of tables and sequences, so we
1086 : : * allocate separate arrays for table OIDs and sequence OIDs based on
1087 : : * the total number of relations (subrel_count).
1088 : : */
1089 : 38 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
250 akapila@postgresql.o 1090 :GNC 38 : subseq_local_oids = palloc(subrel_count * sizeof(Oid));
1964 akapila@postgresql.o 1091 [ + + + + :CBC 136 : foreach(lc, subrel_states)
+ + ]
1092 : : {
1093 : 98 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
1094 : :
250 akapila@postgresql.o 1095 [ + + ]:GNC 98 : if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
1096 : 9 : subseq_local_oids[seq_count++] = relstate->relid;
1097 : : else
1098 : 89 : subrel_local_oids[tbl_count++] = relstate->relid;
1099 : : }
1100 : :
1101 : 38 : qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
1102 : 38 : check_publications_origin_tables(wrconn, sub->publications, copy_data,
1103 : 38 : sub->retaindeadtuples, sub->origin,
1104 : : subrel_local_oids, tbl_count,
1105 : : sub->name);
1106 : :
1107 : 38 : qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
1108 : 38 : check_publications_origin_sequences(wrconn, sub->publications,
1109 : : copy_data, sub->origin,
1110 : : subseq_local_oids, seq_count,
1111 : : sub->name);
1112 : :
1113 : : /*
1114 : : * Walk over the remote relations and try to match them to locally
1115 : : * known relations. If the relation is not known locally create a new
1116 : : * state for it.
1117 : : *
1118 : : * Also builds array of local oids of remote relations for the next
1119 : : * step.
1120 : : */
1964 akapila@postgresql.o 1121 :CBC 38 : off = 0;
250 akapila@postgresql.o 1122 :GNC 38 : pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
1123 : :
1124 [ + + + + : 183 : foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
+ + ]
1125 : : {
1126 : 107 : RangeVar *rv = pubrelinfo->rv;
1127 : : Oid relid;
1128 : : char relkind;
1129 : :
1964 akapila@postgresql.o 1130 :CBC 107 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
250 akapila@postgresql.o 1131 :GNC 107 : relkind = get_rel_relkind(relid);
1132 : :
1133 : : /* Check for supported relkind. */
1134 : 107 : CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1964 akapila@postgresql.o 1135 :CBC 107 : rv->schemaname, rv->relname);
1136 : :
1137 : 107 : pubrel_local_oids[off++] = relid;
1138 : :
1139 [ + + ]: 107 : if (!bsearch(&relid, subrel_local_oids,
250 akapila@postgresql.o 1140 :GNC 39 : tbl_count, sizeof(Oid), oid_cmp) &&
1141 [ + + ]: 39 : !bsearch(&relid, subseq_local_oids,
1142 : : seq_count, sizeof(Oid), oid_cmp))
1143 : : {
1964 akapila@postgresql.o 1144 [ + + ]:CBC 30 : AddSubscriptionRelState(sub->oid, relid,
1145 : : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
1146 : : InvalidXLogRecPtr, true);
1147 [ + + - + ]: 30 : ereport(DEBUG1,
1148 : : errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
1149 : : relkind == RELKIND_SEQUENCE ? "sequence" : "table",
1150 : : rv->schemaname, rv->relname, sub->name));
1151 : : }
1152 : : }
1153 : :
1154 : : /*
1155 : : * Next remove state for tables we should not care about anymore using
1156 : : * the data we collected above
1157 : : */
250 akapila@postgresql.o 1158 :GNC 38 : qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
1159 : :
1160 [ + + ]: 127 : for (off = 0; off < tbl_count; off++)
1161 : : {
1964 akapila@postgresql.o 1162 :CBC 89 : Oid relid = subrel_local_oids[off];
1163 : :
1164 [ + + ]: 89 : if (!bsearch(&relid, pubrel_local_oids,
250 akapila@postgresql.o 1165 :GNC 89 : list_length(pubrels), sizeof(Oid), oid_cmp))
1166 : : {
1167 : : char state;
1168 : : XLogRecPtr statelsn;
202 michael@paquier.xyz 1169 : 21 : SubRemoveRels *remove_rel = palloc_object(SubRemoveRels);
1170 : :
1171 : : /*
1172 : : * Lock pg_subscription_rel with AccessExclusiveLock to
1173 : : * prevent any race conditions with the apply worker
1174 : : * re-launching workers at the same time this code is trying
1175 : : * to remove those tables.
1176 : : *
1177 : : * Even if new worker for this particular rel is restarted it
1178 : : * won't be able to make any progress as we hold exclusive
1179 : : * lock on pg_subscription_rel till the transaction end. It
1180 : : * will simply exit as there is no corresponding rel entry.
1181 : : *
1182 : : * This locking also ensures that the state of rels won't
1183 : : * change till we are done with this refresh operation.
1184 : : */
1964 akapila@postgresql.o 1185 [ + + ]:CBC 21 : if (!rel)
1186 : 9 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1187 : :
1188 : : /* Last known rel state. */
1189 : 21 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1190 : :
1191 : 21 : RemoveSubscriptionRel(sub->oid, relid);
1192 : :
250 akapila@postgresql.o 1193 :GNC 21 : remove_rel->relid = relid;
1194 : 21 : remove_rel->state = state;
1195 : :
1196 : 21 : sub_remove_rels = lappend(sub_remove_rels, remove_rel);
1197 : :
245 1198 : 21 : logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
1199 : :
1200 : : /*
1201 : : * For READY state, we would have already dropped the
1202 : : * tablesync origin.
1203 : : */
1387 akapila@postgresql.o 1204 [ - + ]:CBC 21 : if (state != SUBREL_STATE_READY)
1205 : : {
1206 : : char originname[NAMEDATALEN];
1207 : :
1208 : : /*
1209 : : * Drop the tablesync's origin tracking if exists.
1210 : : *
1211 : : * It is possible that the origin is not yet created for
1212 : : * tablesync worker, this can happen for the states before
1213 : : * SUBREL_STATE_DATASYNC. The tablesync worker or apply
1214 : : * worker can also concurrently try to drop the origin and
1215 : : * by this time the origin might be already removed. For
1216 : : * these reasons, passing missing_ok = true.
1217 : : */
1358 akapila@postgresql.o 1218 :UBC 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1219 : : sizeof(originname));
1964 1220 : 0 : replorigin_drop_by_name(originname, true, false);
1221 : : }
1222 : :
1964 akapila@postgresql.o 1223 [ + + ]:CBC 21 : ereport(DEBUG1,
1224 : : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1225 : : get_namespace_name(get_rel_namespace(relid)),
1226 : : get_rel_name(relid),
1227 : : sub->name)));
1228 : : }
1229 : : }
1230 : :
1231 : : /*
1232 : : * Drop the tablesync slots associated with removed tables. This has
1233 : : * to be at the end because otherwise if there is an error while doing
1234 : : * the database operations we won't be able to rollback dropped slots.
1235 : : */
209 akapila@postgresql.o 1236 [ + + + + :GNC 97 : foreach_ptr(SubRemoveRels, sub_remove_rel, sub_remove_rels)
+ + ]
1237 : : {
1238 [ - + ]: 21 : if (sub_remove_rel->state != SUBREL_STATE_READY &&
209 akapila@postgresql.o 1239 [ # # ]:UNC 0 : sub_remove_rel->state != SUBREL_STATE_SYNCDONE)
1240 : : {
1964 akapila@postgresql.o 1241 :UBC 0 : char syncslotname[NAMEDATALEN] = {0};
1242 : :
1243 : : /*
1244 : : * For READY/SYNCDONE states we know the tablesync slot has
1245 : : * already been dropped by the tablesync worker.
1246 : : *
1247 : : * For other states, there is no certainty, maybe the slot
1248 : : * does not exist yet. Also, if we fail after removing some of
1249 : : * the slots, next time, it will again try to drop already
1250 : : * dropped slots and fail. For these reasons, we allow
1251 : : * missing_ok = true for the drop.
1252 : : */
209 akapila@postgresql.o 1253 :UNC 0 : ReplicationSlotNameForTablesync(sub->oid, sub_remove_rel->relid,
1254 : : syncslotname, sizeof(syncslotname));
1964 akapila@postgresql.o 1255 :UBC 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1256 : : }
1257 : : }
1258 : :
1259 : : /*
1260 : : * Next remove state for sequences we should not care about anymore
1261 : : * using the data we collected above
1262 : : */
250 akapila@postgresql.o 1263 [ + + ]:GNC 47 : for (off = 0; off < seq_count; off++)
1264 : : {
1265 : 9 : Oid relid = subseq_local_oids[off];
1266 : :
1267 [ - + ]: 9 : if (!bsearch(&relid, pubrel_local_oids,
1268 : 9 : list_length(pubrels), sizeof(Oid), oid_cmp))
1269 : : {
1270 : : /*
1271 : : * This locking ensures that the state of rels won't change
1272 : : * till we are done with this refresh operation.
1273 : : */
250 akapila@postgresql.o 1274 [ # # ]:UNC 0 : if (!rel)
1275 : 0 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1276 : :
1277 : 0 : RemoveSubscriptionRel(sub->oid, relid);
1278 : :
1279 [ # # ]: 0 : ereport(DEBUG1,
1280 : : errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
1281 : : get_namespace_name(get_rel_namespace(relid)),
1282 : : get_rel_name(relid),
1283 : : sub->name));
1284 : : }
1285 : : }
1286 : : }
1964 akapila@postgresql.o 1287 :UBC 0 : PG_FINALLY();
1288 : : {
1880 alvherre@alvh.no-ip. 1289 :CBC 38 : walrcv_disconnect(wrconn);
1290 : : }
1964 akapila@postgresql.o 1291 [ - + ]: 38 : PG_END_TRY();
1292 : :
1293 [ + + ]: 38 : if (rel)
1294 : 9 : table_close(rel, NoLock);
3386 peter_e@gmx.net 1295 : 38 : }
1296 : :
1297 : : /*
1298 : : * Marks all sequences with INIT state.
1299 : : */
1300 : : static void
250 akapila@postgresql.o 1301 :GNC 3 : AlterSubscription_refresh_seq(Subscription *sub)
1302 : : {
1303 : 3 : char *err = NULL;
1304 : : WalReceiverConn *wrconn;
1305 : : bool must_use_password;
1306 : :
1307 : : /* Load the library providing us libpq calls. */
1308 : 3 : load_file("libpqwalreceiver", false);
1309 : :
1310 : : /* Try to connect to the publisher. */
1311 [ + - - + ]: 3 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1312 : 3 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1313 : : sub->name, &err);
1314 [ - + ]: 3 : if (!wrconn)
250 akapila@postgresql.o 1315 [ # # ]:UNC 0 : ereport(ERROR,
1316 : : errcode(ERRCODE_CONNECTION_FAILURE),
1317 : : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1318 : : sub->name, err));
1319 : :
250 akapila@postgresql.o 1320 [ + - ]:GNC 3 : PG_TRY();
1321 : : {
1322 : : List *subrel_states;
1323 : :
1324 : 3 : check_publications_origin_sequences(wrconn, sub->publications, true,
1325 : : sub->origin, NULL, 0, sub->name);
1326 : :
1327 : : /* Get local sequence list. */
1328 : 3 : subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
1329 [ + - + + : 19 : foreach_ptr(SubscriptionRelState, subrel, subrel_states)
+ + ]
1330 : : {
1331 : 13 : Oid relid = subrel->relid;
1332 : :
1333 : 13 : UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
1334 : : InvalidXLogRecPtr, false);
1335 [ - + ]: 13 : ereport(DEBUG1,
1336 : : errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1337 : : get_namespace_name(get_rel_namespace(relid)),
1338 : : get_rel_name(relid),
1339 : : sub->name));
1340 : : }
1341 : : }
250 akapila@postgresql.o 1342 :UNC 0 : PG_FINALLY();
1343 : : {
250 akapila@postgresql.o 1344 :GNC 3 : walrcv_disconnect(wrconn);
1345 : : }
1346 [ - + ]: 3 : PG_END_TRY();
1347 : 3 : }
1348 : :
1349 : : /*
1350 : : * Common checks for altering failover, two_phase, and retain_dead_tuples
1351 : : * options.
1352 : : */
1353 : : static void
706 akapila@postgresql.o 1354 :CBC 14 : CheckAlterSubOption(Subscription *sub, const char *option,
1355 : : bool slot_needs_update, bool isTopLevel)
1356 : : {
342 akapila@postgresql.o 1357 [ + + + + :GNC 14 : Assert(strcmp(option, "failover") == 0 ||
- + ]
1358 : : strcmp(option, "two_phase") == 0 ||
1359 : : strcmp(option, "retain_dead_tuples") == 0);
1360 : :
1361 : : /*
1362 : : * Altering the retain_dead_tuples option does not update the slot on the
1363 : : * publisher.
1364 : : */
1365 [ + + - + ]: 14 : Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1366 : :
1367 : : /*
1368 : : * Do not allow changing the option if the subscription is enabled. This
1369 : : * is because both failover and two_phase options of the slot on the
1370 : : * publisher cannot be modified if the slot is currently acquired by the
1371 : : * existing walsender.
1372 : : *
1373 : : * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1374 : : * the publisher by the existing walsender, so we could have allowed that
1375 : : * even when the subscription is enabled. But we kept this restriction for
1376 : : * the sake of consistency and simplicity.
1377 : : *
1378 : : * Additionally, do not allow changing the retain_dead_tuples option when
1379 : : * the subscription is enabled to prevent race conditions arising from the
1380 : : * new option value being acknowledged asynchronously by the launcher and
1381 : : * apply workers.
1382 : : *
1383 : : * Without the restriction, a race condition may arise when a user
1384 : : * disables and immediately re-enables the retain_dead_tuples option. In
1385 : : * this case, the launcher might drop the slot upon noticing the disabled
1386 : : * action, while the apply worker may keep maintaining
1387 : : * oldest_nonremovable_xid without noticing the option change. During this
1388 : : * period, a transaction ID wraparound could falsely make this ID appear
1389 : : * as if it originates from the future w.r.t the transaction ID stored in
1390 : : * the slot maintained by launcher.
1391 : : *
1392 : : * Similarly, if the user enables retain_dead_tuples concurrently with the
1393 : : * launcher starting the worker, the apply worker may start calculating
1394 : : * oldest_nonremovable_xid before the launcher notices the enable action.
1395 : : * Consequently, the launcher may update slot.xmin to a newer value than
1396 : : * that maintained by the worker. In subsequent cycles, upon integrating
1397 : : * the worker's oldest_nonremovable_xid, the launcher might detect a
1398 : : * retreat in the calculated xmin, necessitating additional handling.
1399 : : *
1400 : : * XXX To address the above race conditions, we can define
1401 : : * oldest_nonremovable_xid as FullTransactionId and adds the check to
1402 : : * disallow retreating the conflict slot's xmin. For now, we kept the
1403 : : * implementation simple by disallowing change to the retain_dead_tuples,
1404 : : * but in the future we can change this after some more analysis.
1405 : : *
1406 : : * Note that we could restrict only the enabling of retain_dead_tuples to
1407 : : * avoid the race conditions described above, but we maintain the
1408 : : * restriction for both enable and disable operations for the sake of
1409 : : * consistency.
1410 : : */
706 akapila@postgresql.o 1411 [ + + ]:CBC 14 : if (sub->enabled)
1412 [ + - ]: 2 : ereport(ERROR,
1413 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1414 : : errmsg("cannot set option \"%s\" for enabled subscription",
1415 : : option)));
1416 : :
1417 [ + + ]: 12 : if (slot_needs_update)
1418 : : {
1419 : : StringInfoData cmd;
1420 : :
1421 : : /*
1422 : : * A valid slot must be associated with the subscription for us to
1423 : : * modify any of the slot's properties.
1424 : : */
1425 [ - + ]: 9 : if (!sub->slotname)
706 akapila@postgresql.o 1426 [ # # ]:UBC 0 : ereport(ERROR,
1427 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1428 : : errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1429 : : option)));
1430 : :
1431 : : /* The changed option of the slot can't be rolled back. */
706 akapila@postgresql.o 1432 :CBC 9 : initStringInfo(&cmd);
1433 : 9 : appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1434 : :
1435 : 9 : PreventInTransactionBlock(isTopLevel, cmd.data);
1436 : 5 : pfree(cmd.data);
1437 : : }
1438 : 8 : }
1439 : :
1440 : : /*
1441 : : * Alter the existing subscription.
1442 : : */
1443 : : ObjectAddress
1811 dean.a.rasheed@gmail 1444 : 358 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1445 : : bool isTopLevel)
1446 : : {
1447 : : Relation rel;
1448 : : ObjectAddress myself;
1449 : : bool nulls[Natts_pg_subscription];
1450 : : bool replaces[Natts_pg_subscription];
1451 : : Datum values[Natts_pg_subscription];
1452 : : HeapTuple tup;
1453 : : Oid subid;
13 jdavis@postgresql.or 1454 :GNC 358 : bool orig_conninfo_needed = true;
3386 peter_e@gmx.net 1455 :CBC 358 : bool update_tuple = false;
706 akapila@postgresql.o 1456 : 358 : bool update_failover = false;
1457 : 358 : bool update_two_phase = false;
342 akapila@postgresql.o 1458 :GNC 358 : bool check_pub_rdt = false;
1459 : : bool retain_dead_tuples;
1460 : : int max_retention;
1461 : : bool retention_active;
46 jdavis@postgresql.or 1462 : 358 : char *new_conninfo = NULL;
1463 : : char *origin;
1464 : : Subscription *sub;
1465 : : Form_pg_subscription form;
1466 : : uint32 supported_opts;
1820 akapila@postgresql.o 1467 :CBC 358 : SubOpts opts = {0};
1468 : :
2717 andres@anarazel.de 1469 : 358 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1470 : :
1471 : : /* Fetch the existing tuple. */
326 peter@eisentraut.org 1472 :GNC 358 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
1473 : : CStringGetDatum(stmt->subname));
1474 : :
3449 peter_e@gmx.net 1475 [ + + ]:CBC 358 : if (!HeapTupleIsValid(tup))
1476 [ + - ]: 4 : ereport(ERROR,
1477 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1478 : : errmsg("subscription \"%s\" does not exist",
1479 : : stmt->subname)));
1480 : :
2779 andres@anarazel.de 1481 : 354 : form = (Form_pg_subscription) GETSTRUCT(tup);
1482 : 354 : subid = form->oid;
1483 : :
1484 : : /* must be owner */
1325 peter@eisentraut.org 1485 [ - + ]: 354 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
3132 peter_e@gmx.net 1486 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
3449 1487 : 0 : stmt->subname);
1488 : :
1489 : : /* parse and check options */
13 jdavis@postgresql.or 1490 [ + + + + :GNC 354 : switch (stmt->kind)
+ + + ]
1491 : : {
1492 : 160 : case ALTER_SUBSCRIPTION_OPTIONS:
1493 : 160 : supported_opts = (SUBOPT_SLOT_NAME |
1494 : : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1495 : : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
1496 : : SUBOPT_DISABLE_ON_ERR |
1497 : : SUBOPT_PASSWORD_REQUIRED |
1498 : : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1499 : : SUBOPT_RETAIN_DEAD_TUPLES |
1500 : : SUBOPT_MAX_RETENTION_DURATION |
1501 : : SUBOPT_WAL_RECEIVER_TIMEOUT |
1502 : : SUBOPT_ORIGIN);
1503 : 160 : break;
1504 : :
1505 : 62 : case ALTER_SUBSCRIPTION_ENABLED:
1506 : 62 : supported_opts = SUBOPT_ENABLED;
1507 : 62 : break;
1508 : :
1509 : 19 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
1510 : 19 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1511 : 19 : break;
1512 : :
1513 : 35 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
1514 : : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1515 : 35 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1516 : 35 : break;
1517 : :
1518 : 37 : case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
1519 : 37 : supported_opts = SUBOPT_COPY_DATA;
1520 : 37 : break;
1521 : :
1522 : 15 : case ALTER_SUBSCRIPTION_SKIP:
1523 : 15 : supported_opts = SUBOPT_LSN;
1524 : 15 : break;
1525 : :
1526 : 26 : default:
1527 : 26 : supported_opts = 0;
1528 : 26 : break;
1529 : : }
1530 : :
1531 [ + + ]: 354 : if (supported_opts > 0)
1532 : 328 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
1533 : :
1534 : : /*
1535 : : * Ensure that ALTER SUBSCRIPTION commands that could be used to fix a
1536 : : * broken connection or prepare to drop a broken subscription don't
1537 : : * attempt to construct the conninfo. Otherwise, we might encounter the
1538 : : * error the user is trying to fix.
1539 : : *
1540 : : * Specifically, ALTER SUBSCRIPTION DISABLE, ALTER SUBSCRIPTION SERVER,
1541 : : * ALTER SUBSCRIPTION CONNECTION, or ALTER SUBSCRIPTION SET
1542 : : * (slot_name=NONE).
1543 : : *
1544 : : * NB: if the user specifies multiple SET options, then we may still need
1545 : : * to construct conninfo even if slot_name is set to NONE.
1546 : : */
1547 [ + + ]: 330 : if (stmt->kind == ALTER_SUBSCRIPTION_ENABLED)
1548 : : {
1549 [ + - + + ]: 62 : if (opts.specified_opts == SUBOPT_ENABLED && !opts.enabled)
1550 : 28 : orig_conninfo_needed = false;
1551 : : }
1552 [ + + ]: 268 : else if (stmt->kind == ALTER_SUBSCRIPTION_SERVER ||
1553 [ + + ]: 267 : stmt->kind == ALTER_SUBSCRIPTION_CONNECTION)
1554 : : {
1555 : 23 : orig_conninfo_needed = false;
1556 : : }
1557 [ + + ]: 245 : else if (stmt->kind == ALTER_SUBSCRIPTION_OPTIONS)
1558 : : {
1559 : : /* ... SET (slot_name = NONE) with no other options */
1560 [ + + + + ]: 140 : if (opts.specified_opts == SUBOPT_SLOT_NAME && !opts.slot_name)
1561 : 51 : orig_conninfo_needed = false;
1562 : : }
1563 : :
1564 : : /*
1565 : : * Skip ACL checks on the subscription's foreign server, if any. If
1566 : : * changing the server (or replacing it with a raw connection), then the
1567 : : * old one will be removed anyway. If changing something unrelated,
1568 : : * there's no need to do an additional ACL check here; that will be done
1569 : : * by the subscription worker.
1570 : : */
1571 : 330 : sub = GetSubscription(subid, false, orig_conninfo_needed, false);
1572 : :
342 akapila@postgresql.o 1573 : 330 : retain_dead_tuples = sub->retaindeadtuples;
1574 : 330 : origin = sub->origin;
301 1575 : 330 : max_retention = sub->maxretention;
1576 : 330 : retention_active = sub->retentionactive;
1577 : :
1578 : : /*
1579 : : * Don't allow non-superuser modification of a subscription with
1580 : : * password_required=false.
1581 : : */
1188 rhaas@postgresql.org 1582 [ + + - + ]:CBC 330 : if (!sub->passwordrequired && !superuser())
1188 rhaas@postgresql.org 1583 [ # # ]:UBC 0 : ereport(ERROR,
1584 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1585 : : errmsg("password_required=false is superuser-only"),
1586 : : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1587 : :
1588 : : /* Lock the subscription so nobody else can do anything with it. */
3284 peter_e@gmx.net 1589 :CBC 330 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1590 : :
1591 : : /* Form a new tuple. */
3449 1592 : 330 : memset(values, 0, sizeof(values));
1593 : 330 : memset(nulls, false, sizeof(nulls));
1594 : 330 : memset(replaces, false, sizeof(replaces));
1595 : :
116 jdavis@postgresql.or 1596 :GNC 330 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1597 : :
3386 peter_e@gmx.net 1598 [ + + + + :CBC 330 : switch (stmt->kind)
+ + + + +
- ]
1599 : : {
1600 : 140 : case ALTER_SUBSCRIPTION_OPTIONS:
1601 : : {
1820 akapila@postgresql.o 1602 [ + + ]: 140 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1603 : : {
1604 : : /*
1605 : : * The subscription must be disabled to allow slot_name as
1606 : : * 'none', otherwise, the apply worker will repeatedly try
1607 : : * to stream the data using that slot_name which neither
1608 : : * exists on the publisher nor the user will be allowed to
1609 : : * create it.
1610 : : */
1611 [ - + - - ]: 55 : if (sub->enabled && !opts.slot_name)
3339 peter_e@gmx.net 1612 [ # # ]:UBC 0 : ereport(ERROR,
1613 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1614 : : errmsg("cannot set %s for enabled subscription",
1615 : : "slot_name = NONE")));
1616 : :
1820 akapila@postgresql.o 1617 [ + + ]:CBC 55 : if (opts.slot_name)
3339 peter_e@gmx.net 1618 : 4 : values[Anum_pg_subscription_subslotname - 1] =
1820 akapila@postgresql.o 1619 : 4 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1620 : : else
3339 peter_e@gmx.net 1621 : 51 : nulls[Anum_pg_subscription_subslotname - 1] = true;
3364 1622 : 55 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1623 : : }
1624 : :
1820 akapila@postgresql.o 1625 [ + + ]: 140 : if (opts.synchronous_commit)
1626 : : {
3364 peter_e@gmx.net 1627 : 4 : values[Anum_pg_subscription_subsynccommit - 1] =
1820 akapila@postgresql.o 1628 : 4 : CStringGetTextDatum(opts.synchronous_commit);
3364 peter_e@gmx.net 1629 : 4 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1630 : : }
1631 : :
1820 akapila@postgresql.o 1632 [ + + ]: 140 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1633 : : {
2173 tgl@sss.pgh.pa.us 1634 : 10 : values[Anum_pg_subscription_subbinary - 1] =
1820 akapila@postgresql.o 1635 : 10 : BoolGetDatum(opts.binary);
2173 tgl@sss.pgh.pa.us 1636 : 10 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1637 : : }
1638 : :
1820 akapila@postgresql.o 1639 [ + + ]: 140 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1640 : : {
2126 1641 : 18 : values[Anum_pg_subscription_substream - 1] =
1268 1642 : 18 : CharGetDatum(opts.streaming);
2126 1643 : 18 : replaces[Anum_pg_subscription_substream - 1] = true;
1644 : : }
1645 : :
1569 1646 [ + + ]: 140 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1647 : : {
1648 : : values[Anum_pg_subscription_subdisableonerr - 1]
1649 : 4 : = BoolGetDatum(opts.disableonerr);
1650 : : replaces[Anum_pg_subscription_subdisableonerr - 1]
1651 : 4 : = true;
1652 : : }
1653 : :
1188 rhaas@postgresql.org 1654 [ + + ]: 140 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1655 : : {
1656 : : /* Non-superuser may not disable password_required. */
1657 [ + + - + ]: 8 : if (!opts.passwordrequired && !superuser())
1188 rhaas@postgresql.org 1658 [ # # ]:UBC 0 : ereport(ERROR,
1659 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1660 : : errmsg("password_required=false is superuser-only"),
1661 : : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1662 : :
1663 : : values[Anum_pg_subscription_subpasswordrequired - 1]
1188 rhaas@postgresql.org 1664 :CBC 8 : = BoolGetDatum(opts.passwordrequired);
1665 : : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1666 : 8 : = true;
1667 : : }
1668 : :
1021 akapila@postgresql.o 1669 [ + + ]: 140 : if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1670 : : {
1671 : 9 : values[Anum_pg_subscription_subrunasowner - 1] =
1672 : 9 : BoolGetDatum(opts.runasowner);
1673 : 9 : replaces[Anum_pg_subscription_subrunasowner - 1] = true;
1674 : : }
1675 : :
706 1676 [ + + ]: 140 : if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1677 : : {
1678 : : /*
1679 : : * We need to update both the slot and the subscription
1680 : : * for the two_phase option. We can enable the two_phase
1681 : : * option for a slot only once the initial data
1682 : : * synchronization is done. This is to avoid missing some
1683 : : * data as explained in comments atop worker.c.
1684 : : */
1685 : 3 : update_two_phase = !opts.twophase;
1686 : :
1687 : 3 : CheckAlterSubOption(sub, "two_phase", update_two_phase,
1688 : : isTopLevel);
1689 : :
1690 : : /*
1691 : : * Modifying the two_phase slot option requires a slot
1692 : : * lookup by slot name, so changing the slot name at the
1693 : : * same time is not allowed.
1694 : : */
1695 [ + + ]: 3 : if (update_two_phase &&
1696 [ - + ]: 1 : IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
706 akapila@postgresql.o 1697 [ # # ]:UBC 0 : ereport(ERROR,
1698 : : (errcode(ERRCODE_SYNTAX_ERROR),
1699 : : errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1700 : :
1701 : : /*
1702 : : * Note that workers may still survive even if the
1703 : : * subscription has been disabled.
1704 : : *
1705 : : * Ensure workers have already been exited to avoid
1706 : : * getting prepared transactions while we are disabling
1707 : : * the two_phase option. Otherwise, the changes of an
1708 : : * already prepared transaction can be replicated again
1709 : : * along with its corresponding commit, leading to
1710 : : * duplicate data or errors.
1711 : : */
706 akapila@postgresql.o 1712 [ - + ]:CBC 3 : if (logicalrep_workers_find(subid, true, true))
882 akapila@postgresql.o 1713 [ # # ]:UBC 0 : ereport(ERROR,
1714 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1715 : : errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1716 : : errhint("Try again after some time.")));
1717 : :
1718 : : /*
1719 : : * two_phase cannot be disabled if there are any
1720 : : * uncommitted prepared transactions present otherwise it
1721 : : * can lead to duplicate data or errors as explained in
1722 : : * the comment above.
1723 : : */
706 akapila@postgresql.o 1724 [ + + ]:CBC 3 : if (update_two_phase &&
1725 [ + - ]: 1 : sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
1726 [ - + ]: 1 : LookupGXactBySubid(subid))
882 akapila@postgresql.o 1727 [ # # ]:UBC 0 : ereport(ERROR,
1728 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1729 : : errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1730 : : errhint("Resolve these transactions and try again.")));
1731 : :
1732 : : /* Change system catalog accordingly */
706 akapila@postgresql.o 1733 :CBC 3 : values[Anum_pg_subscription_subtwophasestate - 1] =
1734 [ + + ]: 3 : CharGetDatum(opts.twophase ?
1735 : : LOGICALREP_TWOPHASE_STATE_PENDING :
1736 : : LOGICALREP_TWOPHASE_STATE_DISABLED);
1737 : 3 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1738 : : }
1739 : :
1740 [ + + ]: 140 : if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1741 : : {
1742 : : /*
1743 : : * Similar to the two_phase case above, we need to update
1744 : : * the failover option for both the slot and the
1745 : : * subscription.
1746 : : */
1747 : 9 : update_failover = true;
1748 : :
1749 : 9 : CheckAlterSubOption(sub, "failover", update_failover,
1750 : : isTopLevel);
1751 : :
882 1752 : 4 : values[Anum_pg_subscription_subfailover - 1] =
1753 : 4 : BoolGetDatum(opts.failover);
1754 : 4 : replaces[Anum_pg_subscription_subfailover - 1] = true;
1755 : : }
1756 : :
342 akapila@postgresql.o 1757 [ + + ]:GNC 135 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1758 : : {
1759 : 2 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
1760 : 2 : BoolGetDatum(opts.retaindeadtuples);
1761 : 2 : replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
1762 : :
1763 : : /*
1764 : : * Update the retention status only if there's a change in
1765 : : * the retain_dead_tuples option value.
1766 : : *
1767 : : * Automatically marking retention as active when
1768 : : * retain_dead_tuples is enabled may not always be ideal,
1769 : : * especially if retention was previously stopped and the
1770 : : * user toggles retain_dead_tuples without adjusting the
1771 : : * publisher workload. However, this behavior provides a
1772 : : * convenient way for users to manually refresh the
1773 : : * retention status. Since retention will be stopped again
1774 : : * unless the publisher workload is reduced, this approach
1775 : : * is acceptable for now.
1776 : : */
301 1777 [ + - ]: 2 : if (opts.retaindeadtuples != sub->retaindeadtuples)
1778 : : {
1779 : 2 : values[Anum_pg_subscription_subretentionactive - 1] =
1780 : 2 : BoolGetDatum(opts.retaindeadtuples);
1781 : 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
1782 : :
1783 : 2 : retention_active = opts.retaindeadtuples;
1784 : : }
1785 : :
342 1786 : 2 : CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1787 : :
1788 : : /*
1789 : : * Workers may continue running even after the
1790 : : * subscription has been disabled.
1791 : : *
1792 : : * To prevent race conditions (as described in
1793 : : * CheckAlterSubOption()), ensure that all worker
1794 : : * processes have already exited before proceeding.
1795 : : */
1796 [ - + ]: 1 : if (logicalrep_workers_find(subid, true, true))
342 akapila@postgresql.o 1797 [ # # ]:UNC 0 : ereport(ERROR,
1798 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1799 : : errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1800 : : errhint("Try again after some time.")));
1801 : :
1802 : : /*
1803 : : * Notify the launcher to manage the replication slot for
1804 : : * conflict detection. This ensures that replication slot
1805 : : * is efficiently handled (created, updated, or dropped)
1806 : : * in response to any configuration changes.
1807 : : */
342 akapila@postgresql.o 1808 :GNC 1 : ApplyLauncherWakeupAtCommit();
1809 : :
1810 : 1 : check_pub_rdt = opts.retaindeadtuples;
1811 : 1 : retain_dead_tuples = opts.retaindeadtuples;
1812 : : }
1813 : :
301 1814 [ + + ]: 134 : if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1815 : : {
1816 : 6 : values[Anum_pg_subscription_submaxretention - 1] =
1817 : 6 : Int32GetDatum(opts.maxretention);
1818 : 6 : replaces[Anum_pg_subscription_submaxretention - 1] = true;
1819 : :
1820 : 6 : max_retention = opts.maxretention;
1821 : : }
1822 : :
1823 : : /*
1824 : : * Ensure that system configuration parameters are set
1825 : : * appropriately to support retain_dead_tuples and
1826 : : * max_retention_duration.
1827 : : */
1828 [ + + ]: 134 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1829 [ + + ]: 133 : IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1830 : 7 : CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
1831 : : retain_dead_tuples,
1832 : : retention_active,
1833 : 7 : (max_retention > 0));
1834 : :
1440 akapila@postgresql.o 1835 [ + + ]:CBC 134 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1836 : : {
1837 : 6 : values[Anum_pg_subscription_suborigin - 1] =
1838 : 6 : CStringGetTextDatum(opts.origin);
1839 : 6 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1840 : :
1841 : : /*
1842 : : * Check if changes from different origins may be received
1843 : : * from the publisher when the origin is changed to ANY
1844 : : * and retain_dead_tuples is enabled. Use |= so that we
1845 : : * don't clear the flag already set when
1846 : : * retain_dead_tuples was changed in the same command.
1847 : : */
22 akapila@postgresql.o 1848 [ + + ]:GNC 8 : check_pub_rdt |= retain_dead_tuples &&
342 1849 [ + + ]: 2 : pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
1850 : :
1851 : 6 : origin = opts.origin;
1852 : : }
1853 : :
130 fujii@postgresql.org 1854 [ + + ]: 134 : if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1855 : : {
1856 : 8 : values[Anum_pg_subscription_subwalrcvtimeout - 1] =
1857 : 8 : CStringGetTextDatum(opts.wal_receiver_timeout);
1858 : 8 : replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true;
1859 : : }
1860 : :
3386 peter_e@gmx.net 1861 :CBC 134 : update_tuple = true;
1862 : 134 : break;
1863 : : }
1864 : :
1865 : 62 : case ALTER_SUBSCRIPTION_ENABLED:
1866 : : {
1820 akapila@postgresql.o 1867 [ - + ]: 62 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1868 : :
1869 [ + + + - ]: 62 : if (!sub->slotname && opts.enabled)
3339 peter_e@gmx.net 1870 [ + - ]: 4 : ereport(ERROR,
1871 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1872 : : errmsg("cannot enable subscription that does not have a slot name")));
1873 : :
1874 : : /*
1875 : : * Check track_commit_timestamp only when enabling the
1876 : : * subscription in case it was disabled after creation. See
1877 : : * comments atop CheckSubDeadTupleRetention() for details.
1878 : : */
301 akapila@postgresql.o 1879 :GNC 58 : CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1880 : 58 : WARNING, sub->retaindeadtuples,
1881 : 58 : sub->retentionactive, false);
1882 : :
3386 peter_e@gmx.net 1883 :CBC 58 : values[Anum_pg_subscription_subenabled - 1] =
1820 akapila@postgresql.o 1884 : 58 : BoolGetDatum(opts.enabled);
3386 peter_e@gmx.net 1885 : 58 : replaces[Anum_pg_subscription_subenabled - 1] = true;
1886 : :
1820 akapila@postgresql.o 1887 [ + + ]: 58 : if (opts.enabled)
3353 peter_e@gmx.net 1888 : 30 : ApplyLauncherWakeupAtCommit();
1889 : :
3386 1890 : 58 : update_tuple = true;
1891 : :
1892 : : /*
1893 : : * The subscription might be initially created with
1894 : : * connect=false and retain_dead_tuples=true, meaning the
1895 : : * remote server's status may not be checked. Ensure this
1896 : : * check is conducted now.
1897 : : */
342 akapila@postgresql.o 1898 [ + + + + ]:GNC 58 : check_pub_rdt = sub->retaindeadtuples && opts.enabled;
3386 peter_e@gmx.net 1899 :CBC 58 : break;
1900 : : }
1901 : :
116 jdavis@postgresql.or 1902 :GNC 1 : case ALTER_SUBSCRIPTION_SERVER:
1903 : : {
1904 : : ForeignServer *new_server;
1905 : : ObjectAddress referenced;
1906 : : AclResult aclresult;
1907 : :
1908 : : /*
1909 : : * Remove what was there before, either another foreign server
1910 : : * or a connection string.
1911 : : */
1912 [ - + ]: 1 : if (form->subserver)
1913 : : {
116 jdavis@postgresql.or 1914 :UNC 0 : deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
1915 : : DEPENDENCY_NORMAL,
1916 : : ForeignServerRelationId, form->subserver);
1917 : : }
1918 : : else
1919 : : {
116 jdavis@postgresql.or 1920 :GNC 1 : nulls[Anum_pg_subscription_subconninfo - 1] = true;
1921 : 1 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1922 : : }
1923 : :
1924 : : /*
1925 : : * Check that the subscription owner has USAGE privileges on
1926 : : * the server.
1927 : : */
1928 : 1 : new_server = GetForeignServerByName(stmt->servername, false);
1929 : 1 : aclresult = object_aclcheck(ForeignServerRelationId,
1930 : : new_server->serverid,
1931 : : form->subowner, ACL_USAGE);
1932 [ - + ]: 1 : if (aclresult != ACLCHECK_OK)
116 jdavis@postgresql.or 1933 [ # # ]:UNC 0 : ereport(ERROR,
1934 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1935 : : errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
1936 : : GetUserNameFromId(form->subowner, false),
1937 : : new_server->servername));
1938 : :
1939 : : /* make sure a user mapping exists */
116 jdavis@postgresql.or 1940 :GNC 1 : GetUserMapping(form->subowner, new_server->serverid);
1941 : :
46 1942 : 1 : new_conninfo = ForeignServerConnectionString(form->subowner,
1943 : : new_server);
1944 : :
1945 : : /* Load the library providing us libpq calls. */
116 1946 : 1 : load_file("libpqwalreceiver", false);
1947 : : /* Check the connection info string. */
46 1948 [ - + - - ]: 1 : walrcv_check_conninfo(new_conninfo,
1949 : : sub->passwordrequired && !sub->ownersuperuser);
1950 : :
71 peter@eisentraut.org 1951 : 1 : values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(new_server->serverid);
116 jdavis@postgresql.or 1952 : 1 : replaces[Anum_pg_subscription_subserver - 1] = true;
1953 : :
1954 : 1 : ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
1955 : 1 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1956 : :
1957 : 1 : update_tuple = true;
1958 : : }
1959 : :
1960 : : /*
1961 : : * Since the remote server configuration might have changed,
1962 : : * perform a check to ensure it permits enabling
1963 : : * retain_dead_tuples.
1964 : : */
46 1965 : 1 : check_pub_rdt = sub->retaindeadtuples;
116 1966 : 1 : break;
1967 : :
3386 peter_e@gmx.net 1968 :CBC 22 : case ALTER_SUBSCRIPTION_CONNECTION:
1969 : : /* remove reference to foreign server and dependencies, if present */
116 jdavis@postgresql.or 1970 [ + + ]:GNC 22 : if (form->subserver)
1971 : : {
1972 : 9 : deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
1973 : : DEPENDENCY_NORMAL,
1974 : : ForeignServerRelationId, form->subserver);
1975 : :
71 peter@eisentraut.org 1976 : 9 : values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(InvalidOid);
116 jdavis@postgresql.or 1977 : 9 : replaces[Anum_pg_subscription_subserver - 1] = true;
1978 : : }
1979 : :
46 1980 : 22 : new_conninfo = stmt->conninfo;
1981 : :
1982 : : /* Load the library providing us libpq calls. */
3340 peter_e@gmx.net 1983 :CBC 22 : load_file("libpqwalreceiver", false);
1984 : : /* Check the connection info string. */
46 jdavis@postgresql.or 1985 [ + + + + ]:GNC 22 : walrcv_check_conninfo(new_conninfo,
1986 : : sub->passwordrequired && !sub->ownersuperuser);
1987 : :
3386 peter_e@gmx.net 1988 :CBC 18 : values[Anum_pg_subscription_subconninfo - 1] =
1989 : 18 : CStringGetTextDatum(stmt->conninfo);
1990 : 18 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1991 : 18 : update_tuple = true;
1992 : :
1993 : : /*
1994 : : * Since the remote server configuration might have changed,
1995 : : * perform a check to ensure it permits enabling
1996 : : * retain_dead_tuples.
1997 : : */
342 akapila@postgresql.o 1998 :GNC 18 : check_pub_rdt = sub->retaindeadtuples;
3386 peter_e@gmx.net 1999 :CBC 18 : break;
2000 : :
1911 peter@eisentraut.org 2001 : 19 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
2002 : : {
3386 peter_e@gmx.net 2003 : 19 : values[Anum_pg_subscription_subpublications - 1] =
3331 bruce@momjian.us 2004 : 19 : publicationListToArray(stmt->publication);
3386 peter_e@gmx.net 2005 : 19 : replaces[Anum_pg_subscription_subpublications - 1] = true;
2006 : :
2007 : 19 : update_tuple = true;
2008 : :
2009 : : /* Refresh if user asked us to. */
1820 akapila@postgresql.o 2010 [ + + ]: 19 : if (opts.refresh)
2011 : : {
3339 peter_e@gmx.net 2012 [ - + ]: 15 : if (!sub->enabled)
3339 peter_e@gmx.net 2013 [ # # ]:UBC 0 : ereport(ERROR,
2014 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2015 : : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
2016 : : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
2017 : :
2018 : : /*
2019 : : * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
2020 : : * why this is not allowed.
2021 : : */
1812 akapila@postgresql.o 2022 [ - + - - ]:CBC 15 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1812 akapila@postgresql.o 2023 [ # # ]:UBC 0 : ereport(ERROR,
2024 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2025 : : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
2026 : : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2027 : :
1964 akapila@postgresql.o 2028 :CBC 15 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
2029 : :
2030 : : /* Make sure refresh sees the new list of publications. */
3386 peter_e@gmx.net 2031 : 7 : sub->publications = stmt->publication;
2032 : :
1552 akapila@postgresql.o 2033 : 7 : AlterSubscription_refresh(sub, opts.copy_data,
2034 : : stmt->publication);
2035 : : }
2036 : :
3386 peter_e@gmx.net 2037 : 11 : break;
2038 : : }
2039 : :
1911 peter@eisentraut.org 2040 : 35 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
2041 : : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
2042 : : {
2043 : : List *publist;
1820 akapila@postgresql.o 2044 : 35 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
2045 : :
2046 : 35 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1911 peter@eisentraut.org 2047 : 11 : values[Anum_pg_subscription_subpublications - 1] =
2048 : 11 : publicationListToArray(publist);
2049 : 11 : replaces[Anum_pg_subscription_subpublications - 1] = true;
2050 : :
2051 : 11 : update_tuple = true;
2052 : :
2053 : : /* Refresh if user asked us to. */
1820 akapila@postgresql.o 2054 [ + + ]: 11 : if (opts.refresh)
2055 : : {
2056 : : /* We only need to validate user specified publications. */
1552 2057 [ + + ]: 3 : List *validate_publications = (isadd) ? stmt->publication : NULL;
2058 : :
1911 peter@eisentraut.org 2059 [ - + ]: 3 : if (!sub->enabled)
1911 peter@eisentraut.org 2060 [ # # # # ]:UBC 0 : ereport(ERROR,
2061 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2062 : : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
2063 : : /* translator: %s is an SQL ALTER command */
2064 : : errhint("Use %s instead.",
2065 : : isadd ?
2066 : : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
2067 : : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
2068 : :
2069 : : /*
2070 : : * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
2071 : : * why this is not allowed.
2072 : : */
1812 akapila@postgresql.o 2073 [ - + - - ]:CBC 3 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1812 akapila@postgresql.o 2074 [ # # # # ]:UBC 0 : ereport(ERROR,
2075 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2076 : : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
2077 : : /* translator: %s is an SQL ALTER command */
2078 : : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
2079 : : isadd ?
2080 : : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
2081 : : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
2082 : :
1911 peter@eisentraut.org 2083 :CBC 3 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
2084 : :
2085 : : /* Refresh the new list of publications. */
1771 akapila@postgresql.o 2086 : 3 : sub->publications = publist;
2087 : :
1552 2088 : 3 : AlterSubscription_refresh(sub, opts.copy_data,
2089 : : validate_publications);
2090 : : }
2091 : :
1911 peter@eisentraut.org 2092 : 11 : break;
2093 : : }
2094 : :
258 akapila@postgresql.o 2095 :GNC 37 : case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
2096 : : {
3339 peter_e@gmx.net 2097 [ + + ]:CBC 37 : if (!sub->enabled)
2098 [ + - ]: 4 : ereport(ERROR,
2099 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2100 : : errmsg("%s is not allowed for disabled subscriptions",
2101 : : "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
2102 : :
2103 : : /*
2104 : : * The subscription option "two_phase" requires that
2105 : : * replication has passed the initial table synchronization
2106 : : * phase before the two_phase becomes properly enabled.
2107 : : *
2108 : : * But, having reached this two-phase commit "enabled" state
2109 : : * we must not allow any subsequent table initialization to
2110 : : * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
2111 : : * disallowed when the user had requested two_phase = on mode.
2112 : : *
2113 : : * The exception to this restriction is when copy_data =
2114 : : * false, because when copy_data is false the tablesync will
2115 : : * start already in READY state and will exit directly without
2116 : : * doing anything.
2117 : : *
2118 : : * For more details see comments atop worker.c.
2119 : : */
1812 akapila@postgresql.o 2120 [ - + - - ]: 33 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1812 akapila@postgresql.o 2121 [ # # ]:UBC 0 : ereport(ERROR,
2122 : : (errcode(ERRCODE_SYNTAX_ERROR),
2123 : : errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
2124 : : errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2125 : :
258 akapila@postgresql.o 2126 :GNC 33 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
2127 : :
1552 akapila@postgresql.o 2128 :CBC 29 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
2129 : :
3386 peter_e@gmx.net 2130 : 28 : break;
2131 : : }
2132 : :
250 akapila@postgresql.o 2133 :GNC 3 : case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
2134 : : {
2135 [ - + ]: 3 : if (!sub->enabled)
250 akapila@postgresql.o 2136 [ # # ]:UNC 0 : ereport(ERROR,
2137 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2138 : : errmsg("%s is not allowed for disabled subscriptions",
2139 : : "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
2140 : :
250 akapila@postgresql.o 2141 :GNC 3 : AlterSubscription_refresh_seq(sub);
2142 : :
2143 : 3 : break;
2144 : : }
2145 : :
1561 2146 : 11 : case ALTER_SUBSCRIPTION_SKIP:
2147 : : {
2148 : : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1561 akapila@postgresql.o 2149 [ - + ]:CBC 11 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
2150 : :
2151 : : /*
2152 : : * If the user sets subskiplsn, we do a sanity check to make
2153 : : * sure that the specified LSN is a probable value.
2154 : : */
236 alvherre@kurilemu.de 2155 [ + + ]:GNC 11 : if (XLogRecPtrIsValid(opts.lsn))
2156 : : {
2157 : : ReplOriginId originid;
2158 : : char originname[NAMEDATALEN];
2159 : : XLogRecPtr remote_lsn;
2160 : :
1358 akapila@postgresql.o 2161 :CBC 7 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
2162 : : originname, sizeof(originname));
1561 2163 : 7 : originid = replorigin_by_name(originname, false);
2164 : 7 : remote_lsn = replorigin_get_progress(originid, false);
2165 : :
2166 : : /* Check the given LSN is at least a future LSN */
236 alvherre@kurilemu.de 2167 [ + + - + ]:GNC 7 : if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
1561 akapila@postgresql.o 2168 [ # # ]:UBC 0 : ereport(ERROR,
2169 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2170 : : errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
2171 : : LSN_FORMAT_ARGS(opts.lsn),
2172 : : LSN_FORMAT_ARGS(remote_lsn))));
2173 : : }
2174 : :
1561 akapila@postgresql.o 2175 :CBC 11 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
2176 : 11 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
2177 : :
2178 : 11 : update_tuple = true;
2179 : 11 : break;
2180 : : }
2181 : :
3386 peter_e@gmx.net 2182 :UBC 0 : default:
2183 [ # # ]: 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
2184 : : stmt->kind);
2185 : : }
2186 : :
2187 : : /* Update the catalog if needed. */
3386 peter_e@gmx.net 2188 [ + + ]:CBC 275 : if (update_tuple)
2189 : : {
2190 : 244 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
2191 : : replaces);
2192 : :
2193 : 244 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2194 : :
2195 : 244 : heap_freetuple(tup);
2196 : : }
2197 : :
2198 : : /*
2199 : : * Try to acquire the connection necessary either for modifying the slot
2200 : : * or for checking if the remote server permits enabling
2201 : : * retain_dead_tuples.
2202 : : *
2203 : : * This has to be at the end because otherwise if there is an error while
2204 : : * doing the database operations we won't be able to rollback altered
2205 : : * slot.
2206 : : */
342 akapila@postgresql.o 2207 [ + + + + :GNC 275 : if (update_failover || update_two_phase || check_pub_rdt)
+ + ]
2208 : : {
2209 : : bool must_use_password;
2210 : : char *err;
2211 : : WalReceiverConn *wrconn;
2212 : :
13 jdavis@postgresql.or 2213 [ + - - + ]: 13 : Assert(new_conninfo || orig_conninfo_needed);
2214 : :
2215 : : /* Load the library providing us libpq calls. */
882 akapila@postgresql.o 2216 :CBC 13 : load_file("libpqwalreceiver", false);
2217 : :
2218 : : /*
2219 : : * Try to connect to the publisher, using the new connection string if
2220 : : * available.
2221 : : */
2222 [ + - - + ]: 13 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
46 jdavis@postgresql.or 2223 [ + - ]:GNC 13 : wrconn = walrcv_connect(new_conninfo ? new_conninfo : sub->conninfo,
2224 : : true, true, must_use_password, sub->name,
2225 : : &err);
882 akapila@postgresql.o 2226 [ - + ]:CBC 13 : if (!wrconn)
882 akapila@postgresql.o 2227 [ # # ]:UBC 0 : ereport(ERROR,
2228 : : (errcode(ERRCODE_CONNECTION_FAILURE),
2229 : : errmsg("subscription \"%s\" could not connect to the publisher: %s",
2230 : : sub->name, err)));
2231 : :
882 akapila@postgresql.o 2232 [ + - ]:CBC 13 : PG_TRY();
2233 : : {
342 akapila@postgresql.o 2234 [ + + ]:GNC 13 : if (retain_dead_tuples)
2235 : 9 : check_pub_dead_tuple_retention(wrconn);
2236 : :
250 2237 : 13 : check_publications_origin_tables(wrconn, sub->publications, false,
2238 : : retain_dead_tuples, origin, NULL, 0,
2239 : : sub->name);
2240 : :
342 2241 [ + + + + ]: 13 : if (update_failover || update_two_phase)
2242 [ + + + + ]: 5 : walrcv_alter_slot(wrconn, sub->slotname,
2243 : : update_failover ? &opts.failover : NULL,
2244 : : update_two_phase ? &opts.twophase : NULL);
2245 : : }
882 akapila@postgresql.o 2246 :UBC 0 : PG_FINALLY();
2247 : : {
882 akapila@postgresql.o 2248 :CBC 13 : walrcv_disconnect(wrconn);
2249 : : }
2250 [ - + ]: 13 : PG_END_TRY();
2251 : : }
2252 : :
2717 andres@anarazel.de 2253 : 275 : table_close(rel, RowExclusiveLock);
2254 : :
3449 peter_e@gmx.net 2255 [ - + ]: 275 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
2256 : :
2257 : : /* Wake up related replication workers to handle this change quickly. */
1271 tgl@sss.pgh.pa.us 2258 : 275 : LogicalRepWorkersWakeupAtCommit(subid);
2259 : :
3449 peter_e@gmx.net 2260 : 275 : return myself;
2261 : : }
2262 : :
2263 : : /*
2264 : : * Construct conninfo from a subscription's server. Like libpqrcv_connect(),
2265 : : * if an error occurs, set *err to the error message and return NULL.
2266 : : *
2267 : : * However, failures in ForeignServerConnectionString() may ereport(ERROR),
2268 : : * and (also like libpqrcv_connect) it's not worth adding the machinery to
2269 : : * pass all of those back to the caller just to cover this one case.
2270 : : */
2271 : : static char *
12 jdavis@postgresql.or 2272 :GNC 8 : construct_subserver_conninfo(Oid subserver, Oid subowner, char **err)
2273 : : {
2274 : : AclResult aclresult;
2275 : : ForeignServer *server;
2276 : :
2277 : 8 : *err = NULL;
2278 : :
2279 : 8 : server = GetForeignServer(subserver);
2280 : :
2281 : 8 : aclresult = object_aclcheck(ForeignServerRelationId, subserver,
2282 : : subowner, ACL_USAGE);
2283 [ + + ]: 8 : if (aclresult != ACLCHECK_OK)
2284 : : {
2285 : : /*
2286 : : * Unable to generate connection string because permissions on the
2287 : : * foreign server have been removed. Follow the same logic as an
2288 : : * unusable subconninfo (which will result in an ERROR later unless
2289 : : * slot_name = NONE).
2290 : : */
2291 : 4 : *err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
2292 : : GetUserNameFromId(subowner, false),
2293 : : server->servername);
2294 : 4 : return NULL;
2295 : : }
2296 : :
2297 : 4 : return ForeignServerConnectionString(subowner, server);
2298 : : }
2299 : :
2300 : : /*
2301 : : * Drop a subscription
2302 : : */
2303 : : void
3406 peter_e@gmx.net 2304 :CBC 162 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
2305 : : {
2306 : : Relation rel;
2307 : : ObjectAddress myself;
2308 : : HeapTuple tup;
2309 : : Oid subid;
2310 : : Oid subowner;
2311 : : Oid subserver;
12 jdavis@postgresql.or 2312 :GNC 162 : char *subconninfo = NULL;
2313 : : Datum datum;
2314 : : bool isnull;
2315 : : char *subname;
2316 : 162 : char *conninfo = NULL;
2317 : : char *slotname;
2318 : : List *subworkers;
2319 : : ListCell *lc;
2320 : : char originname[NAMEDATALEN];
3449 peter_e@gmx.net 2321 :CBC 162 : char *err = NULL;
116 jdavis@postgresql.or 2322 :GNC 162 : WalReceiverConn *wrconn = NULL;
2323 : : Form_pg_subscription form;
2324 : : List *rstates;
2325 : : bool must_use_password;
2326 : :
2327 : : /*
2328 : : * The launcher may concurrently start a new worker for this subscription.
2329 : : * During initialization, the worker checks for subscription validity and
2330 : : * exits if the subscription has already been dropped. See
2331 : : * InitializeLogRepWorker.
2332 : : */
315 akapila@postgresql.o 2333 :CBC 162 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2334 : :
326 peter@eisentraut.org 2335 :GNC 162 : tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
3449 peter_e@gmx.net 2336 :CBC 162 : CStringGetDatum(stmt->subname));
2337 : :
2338 [ + + ]: 162 : if (!HeapTupleIsValid(tup))
2339 : : {
2717 andres@anarazel.de 2340 : 8 : table_close(rel, NoLock);
2341 : :
3449 peter_e@gmx.net 2342 [ + + ]: 8 : if (!stmt->missing_ok)
2343 [ + - ]: 4 : ereport(ERROR,
2344 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2345 : : errmsg("subscription \"%s\" does not exist",
2346 : : stmt->subname)));
2347 : : else
2348 [ + - ]: 4 : ereport(NOTICE,
2349 : : (errmsg("subscription \"%s\" does not exist, skipping",
2350 : : stmt->subname)));
2351 : :
2352 : 68 : return;
2353 : : }
2354 : :
12 jdavis@postgresql.or 2355 :GNC 154 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
2356 : : Anum_pg_subscription_subconninfo, &isnull);
2357 [ + + ]: 154 : if (!isnull)
2358 : 137 : subconninfo = TextDatumGetCString(datum);
2359 : :
2779 andres@anarazel.de 2360 :CBC 154 : form = (Form_pg_subscription) GETSTRUCT(tup);
2361 : 154 : subid = form->oid;
1188 rhaas@postgresql.org 2362 : 154 : subowner = form->subowner;
12 jdavis@postgresql.or 2363 :GNC 154 : subserver = form->subserver;
1188 rhaas@postgresql.org 2364 [ + + + + ]:CBC 154 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2365 : :
2366 : : /* must be owner */
1325 peter@eisentraut.org 2367 [ - + ]: 154 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
3132 peter_e@gmx.net 2368 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
3449 2369 : 0 : stmt->subname);
2370 : :
2371 : : /* DROP hook for the subscription being removed */
3449 peter_e@gmx.net 2372 [ - + ]:CBC 154 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
2373 : :
2374 : : /*
2375 : : * Lock the subscription so nobody else can do anything with it (including
2376 : : * the replication workers).
2377 : : */
2378 : 154 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
2379 : :
2380 : : /* Get subname */
1193 dgustafsson@postgres 2381 : 154 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
2382 : : Anum_pg_subscription_subname);
3449 peter_e@gmx.net 2383 : 154 : subname = pstrdup(NameStr(*DatumGetName(datum)));
2384 : :
2385 : : /* Get slotname */
2386 : 154 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
2387 : : Anum_pg_subscription_subslotname, &isnull);
3339 2388 [ + + ]: 154 : if (!isnull)
2389 : 89 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
2390 : : else
2391 : 65 : slotname = NULL;
2392 : :
2393 : : /*
2394 : : * Since dropping a replication slot is not transactional, the replication
2395 : : * slot stays dropped even if the transaction rolls back. So we cannot
2396 : : * run DROP SUBSCRIPTION inside a transaction block if dropping the
2397 : : * replication slot. Also, in this case, we report a message for dropping
2398 : : * the subscription to the cumulative stats system.
2399 : : *
2400 : : * XXX The command name should really be something like "DROP SUBSCRIPTION
2401 : : * of a subscription that is associated with a replication slot", but we
2402 : : * don't have the proper facilities for that.
2403 : : */
2404 [ + + ]: 154 : if (slotname)
3056 2405 : 89 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2406 : :
3449 2407 : 150 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
2408 : 150 : EventTriggerSQLDropAddObject(&myself, true, true);
2409 : :
2410 : : /* Remove the tuple from catalog. */
3436 tgl@sss.pgh.pa.us 2411 : 150 : CatalogTupleDelete(rel, &tup->t_self);
2412 : :
3449 peter_e@gmx.net 2413 : 150 : ReleaseSysCache(tup);
2414 : :
2415 : : /*
2416 : : * Stop all the subscription workers immediately.
2417 : : *
2418 : : * This is necessary if we are dropping the replication slot, so that the
2419 : : * slot becomes accessible.
2420 : : *
2421 : : * It is also necessary if the subscription is disabled and was disabled
2422 : : * in the same transaction. Then the workers haven't seen the disabling
2423 : : * yet and will still be running, leading to hangs later when we want to
2424 : : * drop the replication origin. If the subscription was disabled before
2425 : : * this transaction, then there shouldn't be any workers left, so this
2426 : : * won't make a difference.
2427 : : *
2428 : : * New workers won't be started because we hold an exclusive lock on the
2429 : : * subscription till the end of the transaction.
2430 : : */
706 akapila@postgresql.o 2431 : 150 : subworkers = logicalrep_workers_find(subid, false, true);
3242 tgl@sss.pgh.pa.us 2432 [ + + + + : 231 : foreach(lc, subworkers)
+ + ]
2433 : : {
3252 peter_e@gmx.net 2434 : 81 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
2435 : :
245 akapila@postgresql.o 2436 :GNC 81 : logicalrep_worker_stop(w->type, w->subid, w->relid);
2437 : : }
3252 peter_e@gmx.net 2438 :CBC 150 : list_free(subworkers);
2439 : :
2440 : : /*
2441 : : * Remove the no-longer-useful entry in the launcher's table of apply
2442 : : * worker start times.
2443 : : *
2444 : : * If this transaction rolls back, the launcher might restart a failed
2445 : : * apply worker before wal_retrieve_retry_interval milliseconds have
2446 : : * elapsed, but that's pretty harmless.
2447 : : */
1255 tgl@sss.pgh.pa.us 2448 : 150 : ApplyLauncherForgetWorkerStartTime(subid);
2449 : :
2450 : : /*
2451 : : * Cleanup of tablesync replication origins.
2452 : : *
2453 : : * Any READY-state relations would already have dealt with clean-ups.
2454 : : *
2455 : : * Note that the state can't change because we have already stopped both
2456 : : * the apply and tablesync workers and they can't restart because of
2457 : : * exclusive lock on the subscription.
2458 : : */
250 akapila@postgresql.o 2459 :GNC 150 : rstates = GetSubscriptionRelations(subid, true, false, true);
1964 akapila@postgresql.o 2460 [ + + + + :CBC 156 : foreach(lc, rstates)
+ + ]
2461 : : {
2462 : 6 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2463 : 6 : Oid relid = rstate->relid;
2464 : :
2465 : : /* Only cleanup resources of tablesync workers */
2466 [ - + ]: 6 : if (!OidIsValid(relid))
1964 akapila@postgresql.o 2467 :UBC 0 : continue;
2468 : :
2469 : : /*
2470 : : * Drop the tablesync's origin tracking if exists.
2471 : : *
2472 : : * It is possible that the origin is not yet created for tablesync
2473 : : * worker so passing missing_ok = true. This can happen for the states
2474 : : * before SUBREL_STATE_DATASYNC.
2475 : : */
1358 akapila@postgresql.o 2476 :CBC 6 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
2477 : : sizeof(originname));
1387 2478 : 6 : replorigin_drop_by_name(originname, true, false);
2479 : : }
2480 : :
2481 : : /* Clean up dependencies */
116 jdavis@postgresql.or 2482 :GNC 150 : deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
3448 alvherre@alvh.no-ip. 2483 :CBC 150 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
2484 : :
2485 : : /* Remove any associated relation synchronization states. */
3386 peter_e@gmx.net 2486 : 150 : RemoveSubscriptionRel(subid, InvalidOid);
2487 : :
2488 : : /* Remove the origin tracking if exists. */
1358 akapila@postgresql.o 2489 : 150 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
1966 2490 : 150 : replorigin_drop_by_name(originname, true, false);
2491 : :
2492 : : /*
2493 : : * Tell the cumulative stats system that the subscription is getting
2494 : : * dropped.
2495 : : */
1091 msawada@postgresql.o 2496 : 150 : pgstat_drop_subscription(subid);
2497 : :
2498 : : /*
2499 : : * If there is no slot associated with the subscription, we can finish
2500 : : * here.
2501 : : */
1964 akapila@postgresql.o 2502 [ + + + + ]: 150 : if (!slotname && rstates == NIL)
2503 : : {
2717 andres@anarazel.de 2504 : 64 : table_close(rel, NoLock);
3449 peter_e@gmx.net 2505 : 64 : return;
2506 : : }
2507 : :
2508 : : /*
2509 : : * Try to acquire the connection necessary for dropping slots.
2510 : : *
2511 : : * Note: If the slotname is NONE/NULL then we allow the command to finish
2512 : : * and users need to manually cleanup the apply and tablesync worker slots
2513 : : * later.
2514 : : *
2515 : : * This has to be at the end because otherwise if there is an error while
2516 : : * doing the database operations we won't be able to rollback dropped
2517 : : * slot.
2518 : : */
2519 : 86 : load_file("libpqwalreceiver", false);
2520 : :
12 jdavis@postgresql.or 2521 [ + + ]:GNC 86 : if (OidIsValid(subserver))
2522 : 8 : conninfo = construct_subserver_conninfo(subserver, subowner, &err);
2523 : : else
2524 : 78 : conninfo = subconninfo;
2525 : :
116 2526 [ + + ]: 82 : if (conninfo)
2527 : 78 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2528 : : subname, &err);
2529 : :
3449 peter_e@gmx.net 2530 [ + + ]:CBC 82 : if (wrconn == NULL)
2531 : : {
1964 akapila@postgresql.o 2532 [ - + ]:GBC 4 : if (!slotname)
2533 : : {
2534 : : /* be tidy */
1964 akapila@postgresql.o 2535 :UBC 0 : list_free(rstates);
2536 : 0 : table_close(rel, NoLock);
2537 : 0 : return;
2538 : : }
2539 : : else
2540 : : {
1964 akapila@postgresql.o 2541 :GBC 4 : ReportSlotConnectionError(rstates, subid, slotname, err);
2542 : : }
2543 : : }
2544 : :
1964 akapila@postgresql.o 2545 [ + + ]:CBC 78 : PG_TRY();
2546 : : {
2547 [ + + + + : 84 : foreach(lc, rstates)
+ + ]
2548 : : {
2549 : 6 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2550 : 6 : Oid relid = rstate->relid;
2551 : :
2552 : : /* Only cleanup resources of tablesync workers */
2553 [ - + ]: 6 : if (!OidIsValid(relid))
1964 akapila@postgresql.o 2554 :UBC 0 : continue;
2555 : :
2556 : : /*
2557 : : * Drop the tablesync slots associated with removed tables.
2558 : : *
2559 : : * For SYNCDONE/READY states, the tablesync slot is known to have
2560 : : * already been dropped by the tablesync worker.
2561 : : *
2562 : : * For other states, there is no certainty, maybe the slot does
2563 : : * not exist yet. Also, if we fail after removing some of the
2564 : : * slots, next time, it will again try to drop already dropped
2565 : : * slots and fail. For these reasons, we allow missing_ok = true
2566 : : * for the drop.
2567 : : */
1964 akapila@postgresql.o 2568 [ + + ]:CBC 6 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2569 : : {
2570 : 5 : char syncslotname[NAMEDATALEN] = {0};
2571 : :
1961 2572 : 5 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2573 : : sizeof(syncslotname));
1964 2574 : 5 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
2575 : : }
2576 : : }
2577 : :
2578 : 78 : list_free(rstates);
2579 : :
2580 : : /*
2581 : : * If there is a slot associated with the subscription, then drop the
2582 : : * replication slot at the publisher.
2583 : : */
2584 [ + + ]: 78 : if (slotname)
2585 : 77 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2586 : : }
2587 : 1 : PG_FINALLY();
2588 : : {
2589 : 78 : walrcv_disconnect(wrconn);
2590 : : }
2591 [ + + ]: 78 : PG_END_TRY();
2592 : :
2593 : 77 : table_close(rel, NoLock);
2594 : : }
2595 : :
2596 : : /*
2597 : : * Drop the replication slot at the publisher node using the replication
2598 : : * connection.
2599 : : *
2600 : : * missing_ok - if true then only issue a LOG message if the slot doesn't
2601 : : * exist.
2602 : : */
2603 : : void
2604 : 288 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2605 : : {
2606 : : StringInfoData cmd;
2607 : :
2608 [ - + ]: 288 : Assert(wrconn);
2609 : :
2610 : 288 : load_file("libpqwalreceiver", false);
2611 : :
2612 : 288 : initStringInfo(&cmd);
15 tgl@sss.pgh.pa.us 2613 : 288 : appendStringInfoString(&cmd, "DROP_REPLICATION_SLOT ");
2614 : 288 : appendQuotedIdentifier(&cmd, slotname);
2615 : 288 : appendStringInfoString(&cmd, " WAIT");
2616 : :
3401 fujii@postgresql.org 2617 [ + + ]: 288 : PG_TRY();
2618 : : {
2619 : : WalRcvExecResult *res;
2620 : :
3386 peter_e@gmx.net 2621 : 288 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2622 : :
1964 akapila@postgresql.o 2623 [ + + ]: 288 : if (res->status == WALRCV_OK_COMMAND)
2624 : : {
2625 : : /* NOTICE. Success. */
2626 [ + + ]: 285 : ereport(NOTICE,
2627 : : (errmsg("dropped replication slot \"%s\" on publisher",
2628 : : slotname)));
2629 : : }
2630 [ + - + + ]: 3 : else if (res->status == WALRCV_ERROR &&
2631 : 2 : missing_ok &&
2632 [ + - ]: 2 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
2633 : : {
2634 : : /* LOG. Error, but missing_ok = true. */
2635 [ + - ]: 2 : ereport(LOG,
2636 : : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2637 : : slotname, res->err)));
2638 : : }
2639 : : else
2640 : : {
2641 : : /* ERROR. */
2642 [ + - ]: 1 : ereport(ERROR,
2643 : : (errcode(ERRCODE_CONNECTION_FAILURE),
2644 : : errmsg("could not drop replication slot \"%s\" on publisher: %s",
2645 : : slotname, res->err)));
2646 : : }
2647 : :
3386 peter_e@gmx.net 2648 : 287 : walrcv_clear_result(res);
2649 : : }
2433 peter@eisentraut.org 2650 : 1 : PG_FINALLY();
2651 : : {
1964 akapila@postgresql.o 2652 : 288 : pfree(cmd.data);
2653 : : }
3401 fujii@postgresql.org 2654 [ + + ]: 288 : PG_END_TRY();
3449 peter_e@gmx.net 2655 : 287 : }
2656 : :
2657 : : /*
2658 : : * Internal workhorse for changing a subscription owner
2659 : : */
2660 : : static void
2661 : 11 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2662 : : {
2663 : : Form_pg_subscription form;
2664 : : AclResult aclresult;
2665 : :
2666 : 11 : form = (Form_pg_subscription) GETSTRUCT(tup);
2667 : :
2668 [ + + ]: 11 : if (form->subowner == newOwnerId)
2669 : 2 : return;
2670 : :
1325 peter@eisentraut.org 2671 [ - + ]: 9 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
3132 peter_e@gmx.net 2672 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
3449 2673 : 0 : NameStr(form->subname));
2674 : :
2675 : : /*
2676 : : * Don't allow non-superuser modification of a subscription with
2677 : : * password_required=false.
2678 : : */
1188 rhaas@postgresql.org 2679 [ - + - - ]:CBC 9 : if (!form->subpasswordrequired && !superuser())
3449 peter_e@gmx.net 2680 [ # # ]:UBC 0 : ereport(ERROR,
2681 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2682 : : errmsg("password_required=false is superuser-only"),
2683 : : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2684 : :
2685 : : /* Must be able to become new owner */
1188 rhaas@postgresql.org 2686 :CBC 9 : check_can_set_role(GetUserId(), newOwnerId);
2687 : :
2688 : : /*
2689 : : * current owner must have CREATE on database
2690 : : *
2691 : : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2692 : : * other object types behave differently (e.g. you can't give a table to a
2693 : : * user who lacks CREATE privileges on a schema).
2694 : : */
2695 : 5 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
2696 : : GetUserId(), ACL_CREATE);
2697 [ - + ]: 5 : if (aclresult != ACLCHECK_OK)
1188 rhaas@postgresql.org 2698 :UBC 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2699 : 0 : get_database_name(MyDatabaseId));
2700 : :
2701 : : /*
2702 : : * If the subscription uses a server, check that the new owner has USAGE
2703 : : * privileges on the server and that a user mapping exists. Note: does not
2704 : : * re-check the resulting connection string.
2705 : : */
116 jdavis@postgresql.or 2706 [ - + ]:GNC 5 : if (OidIsValid(form->subserver))
2707 : : {
98 jdavis@postgresql.or 2708 :UNC 0 : ForeignServer *server = GetForeignServer(form->subserver);
2709 : :
2710 : 0 : aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, newOwnerId, ACL_USAGE);
116 2711 [ # # ]: 0 : if (aclresult != ACLCHECK_OK)
2712 [ # # ]: 0 : ereport(ERROR,
2713 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2714 : : errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
2715 : : GetUserNameFromId(newOwnerId, false),
2716 : : server->servername));
2717 : :
2718 : : /* make sure a user mapping exists */
98 2719 : 0 : GetUserMapping(newOwnerId, server->serverid);
2720 : : }
2721 : :
3449 peter_e@gmx.net 2722 :CBC 5 : form->subowner = newOwnerId;
3437 alvherre@alvh.no-ip. 2723 : 5 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2724 : :
2725 : : /* Update owner dependency reference */
3449 peter_e@gmx.net 2726 : 5 : changeDependencyOnOwner(SubscriptionRelationId,
2727 : : form->oid,
2728 : : newOwnerId);
2729 : :
2730 [ - + ]: 5 : InvokeObjectPostAlterHook(SubscriptionRelationId,
2731 : : form->oid, 0);
2732 : :
2733 : : /* Wake up related background processes to handle this change quickly. */
1635 jdavis@postgresql.or 2734 : 5 : ApplyLauncherWakeupAtCommit();
1271 tgl@sss.pgh.pa.us 2735 : 5 : LogicalRepWorkersWakeupAtCommit(form->oid);
2736 : : }
2737 : :
2738 : : /*
2739 : : * Change subscription owner -- by name
2740 : : */
2741 : : ObjectAddress
3449 peter_e@gmx.net 2742 : 11 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
2743 : : {
2744 : : Oid subid;
2745 : : HeapTuple tup;
2746 : : Relation rel;
2747 : : ObjectAddress address;
2748 : : Form_pg_subscription form;
2749 : :
2717 andres@anarazel.de 2750 : 11 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2751 : :
326 peter@eisentraut.org 2752 :GNC 11 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2753 : : CStringGetDatum(name));
2754 : :
3449 peter_e@gmx.net 2755 [ - + ]:CBC 11 : if (!HeapTupleIsValid(tup))
3449 peter_e@gmx.net 2756 [ # # ]:UBC 0 : ereport(ERROR,
2757 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2758 : : errmsg("subscription \"%s\" does not exist", name)));
2759 : :
2779 andres@anarazel.de 2760 :CBC 11 : form = (Form_pg_subscription) GETSTRUCT(tup);
2761 : 11 : subid = form->oid;
2762 : :
3449 peter_e@gmx.net 2763 : 11 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2764 : :
2765 : 7 : ObjectAddressSet(address, SubscriptionRelationId, subid);
2766 : :
2767 : 7 : heap_freetuple(tup);
2768 : :
2717 andres@anarazel.de 2769 : 7 : table_close(rel, RowExclusiveLock);
2770 : :
3449 peter_e@gmx.net 2771 : 7 : return address;
2772 : : }
2773 : :
2774 : : /*
2775 : : * Change subscription owner -- by OID
2776 : : */
2777 : : void
3449 peter_e@gmx.net 2778 :UBC 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
2779 : : {
2780 : : HeapTuple tup;
2781 : : Relation rel;
2782 : :
2717 andres@anarazel.de 2783 : 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2784 : :
3449 peter_e@gmx.net 2785 : 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
2786 : :
2787 [ # # ]: 0 : if (!HeapTupleIsValid(tup))
2788 [ # # ]: 0 : ereport(ERROR,
2789 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2790 : : errmsg("subscription with OID %u does not exist", subid)));
2791 : :
2792 : 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2793 : :
2794 : 0 : heap_freetuple(tup);
2795 : :
2717 andres@anarazel.de 2796 : 0 : table_close(rel, RowExclusiveLock);
3449 peter_e@gmx.net 2797 : 0 : }
2798 : :
2799 : : /*
2800 : : * Check and log a warning if the publisher has subscribed to the same table,
2801 : : * its partition ancestors (if it's a partition), or its partition children (if
2802 : : * it's a partitioned table), from some other publishers. This check is
2803 : : * required in the following scenarios:
2804 : : *
2805 : : * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2806 : : * statements with "copy_data = true" and "origin = none":
2807 : : * - Warn the user that data with an origin might have been copied.
2808 : : * - This check is skipped for tables already added, as incremental sync via
2809 : : * WAL allows origin tracking. The list of such tables is in
2810 : : * subrel_local_oids.
2811 : : *
2812 : : * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2813 : : * statements with "retain_dead_tuples = true" and "origin = any", and for
2814 : : * ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
2815 : : * or when the publisher's status changes (e.g., due to a connection string
2816 : : * update):
2817 : : * - Warn the user that only conflict detection info for local changes on
2818 : : * the publisher is retained. Data from other origins may lack sufficient
2819 : : * details for reliable conflict detection.
2820 : : * - See comments atop worker.c for more details.
2821 : : */
2822 : : static void
250 akapila@postgresql.o 2823 :GNC 177 : check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
2824 : : bool copydata, bool retain_dead_tuples,
2825 : : char *origin, Oid *subrel_local_oids,
2826 : : int subrel_count, char *subname)
2827 : : {
2828 : : WalRcvExecResult *res;
2829 : : StringInfoData cmd;
2830 : : TupleTableSlot *slot;
1391 akapila@postgresql.o 2831 :CBC 177 : Oid tableRow[1] = {TEXTOID};
2832 : 177 : List *publist = NIL;
2833 : : int i;
2834 : : bool check_rdt;
2835 : : bool check_table_sync;
342 akapila@postgresql.o 2836 [ + - + + ]:GNC 354 : bool origin_none = origin &&
2837 : 177 : pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
2838 : :
2839 : : /*
2840 : : * Enable retain_dead_tuples checks only when origin is set to 'any',
2841 : : * since with origin='none' only local changes are replicated to the
2842 : : * subscriber.
2843 : : */
2844 [ + + + + ]: 177 : check_rdt = retain_dead_tuples && !origin_none;
2845 : :
2846 : : /*
2847 : : * Enable table synchronization checks only when origin is 'none', to
2848 : : * ensure that data from other origins is not inadvertently copied.
2849 : : */
2850 [ + + + + ]: 177 : check_table_sync = copydata && origin_none;
2851 : :
2852 : : /* retain_dead_tuples and table sync checks occur separately */
2853 [ + + - + ]: 177 : Assert(!(check_rdt && check_table_sync));
2854 : :
2855 : : /* Return if no checks are required */
2856 [ + + + + ]: 177 : if (!check_rdt && !check_table_sync)
1391 akapila@postgresql.o 2857 :CBC 163 : return;
2858 : :
2859 : 14 : initStringInfo(&cmd);
2860 : 14 : appendStringInfoString(&cmd,
2861 : : "SELECT DISTINCT P.pubname AS pubname\n"
2862 : : "FROM pg_publication P,\n"
2863 : : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2864 : : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2865 : : " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2866 : : " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2867 : : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2868 : : "WHERE C.oid = GPT.relid AND P.pubname IN (");
613 michael@paquier.xyz 2869 : 14 : GetPublicationsStr(publications, &cmd, true);
1391 akapila@postgresql.o 2870 : 14 : appendStringInfoString(&cmd, ")\n");
2871 : :
2872 : : /*
2873 : : * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2874 : : * subrel_local_oids contains the list of relation oids that are already
2875 : : * present on the subscriber. This check should be skipped for these
2876 : : * tables if checking for table sync scenario. However, when handling the
2877 : : * retain_dead_tuples scenario, ensure all tables are checked, as some
2878 : : * existing tables may now include changes from other origins due to newly
2879 : : * created subscriptions on the publisher.
2880 : : */
342 akapila@postgresql.o 2881 [ + + ]:GNC 14 : if (check_table_sync)
2882 : : {
2883 [ + + ]: 14 : for (i = 0; i < subrel_count; i++)
2884 : : {
2885 : 4 : Oid relid = subrel_local_oids[i];
2886 : 4 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
2887 : 4 : char *tablename = get_rel_name(relid);
50 noah@leadboat.com 2888 : 4 : char *schemaname_lit = quote_literal_cstr(schemaname);
2889 : 4 : char *tablename_lit = quote_literal_cstr(tablename);
2890 : :
2891 : 4 : appendStringInfo(&cmd, "AND NOT (N.nspname = %s AND C.relname = %s)\n",
2892 : : schemaname_lit, tablename_lit);
2893 : :
2894 : 4 : pfree(schemaname_lit);
2895 : 4 : pfree(tablename_lit);
2896 : : }
2897 : : }
2898 : :
1391 akapila@postgresql.o 2899 :CBC 14 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2900 : 14 : pfree(cmd.data);
2901 : :
2902 [ - + ]: 14 : if (res->status != WALRCV_OK_TUPLES)
1391 akapila@postgresql.o 2903 [ # # ]:UBC 0 : ereport(ERROR,
2904 : : (errcode(ERRCODE_CONNECTION_FAILURE),
2905 : : errmsg("could not receive list of replicated tables from the publisher: %s",
2906 : : res->err)));
2907 : :
2908 : : /* Process publications. */
1391 akapila@postgresql.o 2909 :CBC 14 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2910 [ + + ]: 19 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2911 : : {
2912 : : char *pubname;
2913 : : bool isnull;
2914 : :
2915 : 5 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2916 [ - + ]: 5 : Assert(!isnull);
2917 : :
2918 : 5 : ExecClearTuple(slot);
2919 : 5 : publist = list_append_unique(publist, makeString(pubname));
2920 : : }
2921 : :
2922 : : /*
2923 : : * Log a warning if the publisher has subscribed to the same table from
2924 : : * some other publisher. We cannot know the origin of data during the
2925 : : * initial sync. Data origins can be found only from the WAL by looking at
2926 : : * the origin id.
2927 : : *
2928 : : * XXX: For simplicity, we don't check whether the table has any data or
2929 : : * not. If the table doesn't have any data then we don't need to
2930 : : * distinguish between data having origin and data not having origin so we
2931 : : * can avoid logging a warning for table sync scenario.
2932 : : */
1391 akapila@postgresql.o 2933 [ + + ]:GNC 14 : if (publist)
2934 : : {
2935 : : StringInfoData pubnames;
2936 : :
2937 : : /* Prepare the list of publication(s) for warning message. */
235 drowley@postgresql.o 2938 : 5 : initStringInfo(&pubnames);
2939 : 5 : GetPublicationsStr(publist, &pubnames, false);
2940 : :
342 akapila@postgresql.o 2941 [ + + ]: 5 : if (check_table_sync)
235 drowley@postgresql.o 2942 [ + - ]: 4 : ereport(WARNING,
2943 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2944 : : errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2945 : : subname),
2946 : : errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2947 : : "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2948 : : list_length(publist), pubnames.data),
2949 : : errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2950 : : else
2951 [ + - ]: 1 : ereport(WARNING,
2952 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2953 : : errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
2954 : : subname),
2955 : : errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2956 : : "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2957 : : list_length(publist), pubnames.data),
2958 : : errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
2959 : : }
2960 : :
1391 akapila@postgresql.o 2961 : 14 : ExecDropSingleTupleTableSlot(slot);
2962 : :
2963 : 14 : walrcv_clear_result(res);
2964 : : }
2965 : :
2966 : : /*
2967 : : * This function is similar to check_publications_origin_tables and serves
2968 : : * same purpose for sequences.
2969 : : */
2970 : : static void
250 2971 : 167 : check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
2972 : : bool copydata, char *origin,
2973 : : Oid *subrel_local_oids, int subrel_count,
2974 : : char *subname)
2975 : : {
2976 : : WalRcvExecResult *res;
2977 : : StringInfoData cmd;
2978 : : TupleTableSlot *slot;
2979 : 167 : Oid tableRow[1] = {TEXTOID};
2980 : 167 : List *publist = NIL;
2981 : :
2982 : : /*
2983 : : * Enable sequence synchronization checks only when origin is 'none' , to
2984 : : * ensure that sequence data from other origins is not inadvertently
2985 : : * copied. This check is necessary if the publisher is running PG19 or
2986 : : * later, where logical replication sequence synchronization is supported.
2987 : : */
188 fujii@postgresql.org 2988 [ + + + + : 177 : if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
- + ]
2989 : 10 : walrcv_server_version(wrconn) < 190000)
250 akapila@postgresql.o 2990 : 157 : return;
2991 : :
2992 : 10 : initStringInfo(&cmd);
2993 : 10 : appendStringInfoString(&cmd,
2994 : : "SELECT DISTINCT P.pubname AS pubname\n"
2995 : : "FROM pg_publication P,\n"
2996 : : " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2997 : : " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2998 : : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2999 : : "WHERE C.oid = GPS.relid AND P.pubname IN (");
3000 : :
3001 : 10 : GetPublicationsStr(publications, &cmd, true);
3002 : 10 : appendStringInfoString(&cmd, ")\n");
3003 : :
3004 : : /*
3005 : : * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
3006 : : * subrel_local_oids contains the list of relations that are already
3007 : : * present on the subscriber. This check should be skipped as these will
3008 : : * not be re-synced.
3009 : : */
3010 [ - + ]: 10 : for (int i = 0; i < subrel_count; i++)
3011 : : {
250 akapila@postgresql.o 3012 :UNC 0 : Oid relid = subrel_local_oids[i];
3013 : 0 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
3014 : 0 : char *seqname = get_rel_name(relid);
50 noah@leadboat.com 3015 : 0 : char *schemaname_lit = quote_literal_cstr(schemaname);
3016 : 0 : char *seqname_lit = quote_literal_cstr(seqname);
3017 : :
250 akapila@postgresql.o 3018 : 0 : appendStringInfo(&cmd,
3019 : : "AND NOT (N.nspname = %s AND C.relname = %s)\n",
3020 : : schemaname_lit, seqname_lit);
3021 : :
50 noah@leadboat.com 3022 : 0 : pfree(schemaname_lit);
3023 : 0 : pfree(seqname_lit);
3024 : : }
3025 : :
250 akapila@postgresql.o 3026 :GNC 10 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
3027 : 10 : pfree(cmd.data);
3028 : :
3029 [ - + ]: 10 : if (res->status != WALRCV_OK_TUPLES)
250 akapila@postgresql.o 3030 [ # # ]:UNC 0 : ereport(ERROR,
3031 : : (errcode(ERRCODE_CONNECTION_FAILURE),
3032 : : errmsg("could not receive list of replicated sequences from the publisher: %s",
3033 : : res->err)));
3034 : :
3035 : : /* Process publications. */
250 akapila@postgresql.o 3036 :GNC 10 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3037 [ - + ]: 10 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3038 : : {
3039 : : char *pubname;
3040 : : bool isnull;
3041 : :
250 akapila@postgresql.o 3042 :UNC 0 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3043 [ # # ]: 0 : Assert(!isnull);
3044 : :
3045 : 0 : ExecClearTuple(slot);
3046 : 0 : publist = list_append_unique(publist, makeString(pubname));
3047 : : }
3048 : :
3049 : : /*
3050 : : * Log a warning if the publisher has subscribed to the same sequence from
3051 : : * some other publisher. We cannot know the origin of sequences data
3052 : : * during the initial sync.
3053 : : */
250 akapila@postgresql.o 3054 [ - + ]:CBC 10 : if (publist)
3055 : : {
3056 : : StringInfoData pubnames;
3057 : :
3058 : : /* Prepare the list of publication(s) for warning message. */
235 drowley@postgresql.o 3059 :UNC 0 : initStringInfo(&pubnames);
3060 : 0 : GetPublicationsStr(publist, &pubnames, false);
3061 : :
250 akapila@postgresql.o 3062 [ # # ]:LBC (4) : ereport(WARNING,
3063 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3064 : : errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
3065 : : subname),
3066 : : errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
3067 : : "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
3068 : : list_length(publist), pubnames.data),
3069 : : errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
3070 : : }
3071 : :
250 akapila@postgresql.o 3072 :CBC 10 : ExecDropSingleTupleTableSlot(slot);
3073 : :
3074 : 10 : walrcv_clear_result(res);
3075 : : }
3076 : :
3077 : : /*
3078 : : * Determine whether the retain_dead_tuples can be enabled based on the
3079 : : * publisher's status.
3080 : : *
3081 : : * This option is disallowed if the publisher is running a version earlier
3082 : : * than the PG19, or if the publisher is in recovery (i.e., it is a standby
3083 : : * server).
3084 : : *
3085 : : * See comments atop worker.c for a detailed explanation.
3086 : : */
3087 : : static void
342 akapila@postgresql.o 3088 :GNC 12 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
3089 : : {
3090 : : WalRcvExecResult *res;
3091 : 12 : Oid RecoveryRow[1] = {BOOLOID};
3092 : : TupleTableSlot *slot;
3093 : : bool isnull;
3094 : : bool remote_in_recovery;
3095 : :
188 fujii@postgresql.org 3096 [ - + ]: 12 : if (walrcv_server_version(wrconn) < 190000)
342 akapila@postgresql.o 3097 [ # # ]:UNC 0 : ereport(ERROR,
3098 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3099 : : errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
3100 : :
342 akapila@postgresql.o 3101 :GNC 12 : res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
3102 : :
3103 [ - + ]: 12 : if (res->status != WALRCV_OK_TUPLES)
342 akapila@postgresql.o 3104 [ # # ]:UNC 0 : ereport(ERROR,
3105 : : (errcode(ERRCODE_CONNECTION_FAILURE),
3106 : : errmsg("could not obtain recovery progress from the publisher: %s",
3107 : : res->err)));
3108 : :
342 akapila@postgresql.o 3109 :GNC 12 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3110 [ - + ]: 12 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
342 akapila@postgresql.o 3111 [ # # ]:UNC 0 : elog(ERROR, "failed to fetch tuple for the recovery progress");
3112 : :
342 akapila@postgresql.o 3113 :GNC 12 : remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
3114 : :
3115 [ - + ]: 12 : if (remote_in_recovery)
342 akapila@postgresql.o 3116 [ # # ]:UNC 0 : ereport(ERROR,
3117 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3118 : : errmsg("cannot enable retain_dead_tuples if the publisher is in recovery"));
3119 : :
342 akapila@postgresql.o 3120 :GNC 12 : ExecDropSingleTupleTableSlot(slot);
3121 : :
3122 : 12 : walrcv_clear_result(res);
3123 : 12 : }
3124 : :
3125 : : /*
3126 : : * Check if the subscriber's configuration is adequate to enable the
3127 : : * retain_dead_tuples option.
3128 : : *
3129 : : * Issue an ERROR if the wal_level does not support the use of replication
3130 : : * slots when check_guc is set to true.
3131 : : *
3132 : : * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
3133 : : * set to true. This is only to highlight the importance of enabling
3134 : : * track_commit_timestamp instead of catching all the misconfigurations, as
3135 : : * this setting can be adjusted after subscription creation. Without it, the
3136 : : * apply worker will simply skip conflict detection.
3137 : : *
3138 : : * Issue a WARNING or NOTICE if the subscription is disabled and the retention
3139 : : * is active. Do not raise an ERROR since users can only modify
3140 : : * retain_dead_tuples for disabled subscriptions. And as long as the
3141 : : * subscription is enabled promptly, it will not pose issues.
3142 : : *
3143 : : * Issue a NOTICE to inform users that max_retention_duration is
3144 : : * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
3145 : : * is not issued because setting max_retention_duration causes no harm,
3146 : : * even when it is ineffective.
3147 : : */
3148 : : void
3149 : 296 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
3150 : : int elevel_for_sub_disabled,
3151 : : bool retain_dead_tuples, bool retention_active,
3152 : : bool max_retention_set)
3153 : : {
3154 [ + + - + ]: 296 : Assert(elevel_for_sub_disabled == NOTICE ||
3155 : : elevel_for_sub_disabled == WARNING);
3156 : :
301 3157 [ + + ]: 296 : if (retain_dead_tuples)
3158 : : {
3159 [ + + - + ]: 17 : if (check_guc && wal_level < WAL_LEVEL_REPLICA)
301 akapila@postgresql.o 3160 [ # # ]:UNC 0 : ereport(ERROR,
3161 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3162 : : errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
3163 : : errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
3164 : :
301 akapila@postgresql.o 3165 [ + + + + ]:GNC 17 : if (check_guc && !track_commit_timestamp)
3166 [ + - ]: 4 : ereport(WARNING,
3167 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3168 : : errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
3169 : : errhint("Consider setting \"%s\" to true.",
3170 : : "track_commit_timestamp"));
3171 : :
3172 [ + + + - ]: 17 : if (sub_disabled && retention_active)
3173 [ + - + + ]: 7 : ereport(elevel_for_sub_disabled,
3174 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3175 : : errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
3176 : : (elevel_for_sub_disabled > NOTICE)
3177 : : ? errhint("Consider setting %s to false.",
3178 : : "retain_dead_tuples") : 0);
3179 : : }
3180 [ + + ]: 279 : else if (max_retention_set)
3181 : : {
3182 [ + - ]: 4 : ereport(NOTICE,
3183 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3184 : : errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
3185 : : }
342 3186 : 296 : }
3187 : :
3188 : : /*
3189 : : * Return true iff 'rv' is a member of the list.
3190 : : */
3191 : : static bool
250 3192 : 287 : list_member_rangevar(const List *list, RangeVar *rv)
3193 : : {
3194 [ + + + + : 1044 : foreach_ptr(PublicationRelKind, relinfo, list)
+ + ]
3195 : : {
3196 [ + + ]: 472 : if (equal(relinfo->rv, rv))
3197 : 1 : return true;
3198 : : }
3199 : :
3200 : 286 : return false;
3201 : : }
3202 : :
3203 : : /*
3204 : : * Get the list of tables and sequences which belong to specified publications
3205 : : * on the publisher connection.
3206 : : *
3207 : : * Note that we don't support the case where the column list is different for
3208 : : * the same table in different publications to avoid sending unwanted column
3209 : : * information for some of the rows. This can happen when both the column
3210 : : * list and row filter are specified for different publications.
3211 : : */
3212 : : static List *
3213 : 164 : fetch_relation_list(WalReceiverConn *wrconn, List *publications)
3214 : : {
3215 : : WalRcvExecResult *res;
3216 : : StringInfoData cmd;
3217 : : TupleTableSlot *slot;
3218 : 164 : Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
3219 : 164 : List *relationlist = NIL;
1189 akapila@postgresql.o 3220 :CBC 164 : int server_version = walrcv_server_version(wrconn);
3221 : 164 : bool check_columnlist = (server_version >= 150000);
250 akapila@postgresql.o 3222 [ + - ]:GNC 164 : int column_count = check_columnlist ? 4 : 3;
3223 : : StringInfoData pub_names;
3224 : :
3386 peter_e@gmx.net 3225 :CBC 164 : initStringInfo(&cmd);
236 drowley@postgresql.o 3226 :GNC 164 : initStringInfo(&pub_names);
3227 : :
3228 : : /* Build the pub_names comma-separated string. */
3229 : 164 : GetPublicationsStr(publications, &pub_names, true);
3230 : :
3231 : : /* Get the list of relations from the publisher */
1189 akapila@postgresql.o 3232 [ + - ]:CBC 164 : if (server_version >= 160000)
3233 : : {
250 akapila@postgresql.o 3234 :GNC 164 : tableRow[3] = INT2VECTOROID;
3235 : :
3236 : : /*
3237 : : * From version 16, we allowed passing multiple publications to the
3238 : : * function pg_get_publication_tables. This helped to filter out the
3239 : : * partition table whose ancestor is also published in this
3240 : : * publication array.
3241 : : *
3242 : : * Join pg_get_publication_tables with pg_publication to exclude
3243 : : * non-existing publications.
3244 : : *
3245 : : * Note that attrs are always stored in sorted order so we don't need
3246 : : * to worry if different publications have specified them in a
3247 : : * different order. See pub_collist_validate.
3248 : : */
3249 : 164 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
3250 : : " FROM pg_class c\n"
3251 : : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
3252 : : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
3253 : : " FROM pg_publication\n"
3254 : : " WHERE pubname IN ( %s )) AS gpt\n"
3255 : : " ON gpt.relid = c.oid\n",
3256 : : pub_names.data);
3257 : :
3258 : : /* From version 19, inclusion of sequences in the target is supported */
3259 [ + - ]: 164 : if (server_version >= 190000)
3260 : 164 : appendStringInfo(&cmd,
3261 : : "UNION ALL\n"
3262 : : " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
3263 : : " FROM pg_catalog.pg_publication_sequences s\n"
3264 : : " WHERE s.pubname IN ( %s )",
3265 : : pub_names.data);
3266 : : }
3267 : : else
3268 : : {
250 akapila@postgresql.o 3269 :UNC 0 : tableRow[3] = NAMEARRAYOID;
3270 : 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
3271 : :
3272 : : /* Get column lists for each relation if the publisher supports it */
1189 akapila@postgresql.o 3273 [ # # ]:UBC 0 : if (check_columnlist)
3274 : 0 : appendStringInfoString(&cmd, ", t.attnames\n");
3275 : :
613 michael@paquier.xyz 3276 : 0 : appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
3277 : : " WHERE t.pubname IN ( %s )",
3278 : : pub_names.data);
3279 : : }
3280 : :
236 drowley@postgresql.o 3281 :GNC 164 : pfree(pub_names.data);
3282 : :
250 akapila@postgresql.o 3283 : 164 : res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
3386 peter_e@gmx.net 3284 :CBC 164 : pfree(cmd.data);
3285 : :
3286 [ - + ]: 164 : if (res->status != WALRCV_OK_TUPLES)
3386 peter_e@gmx.net 3287 [ # # ]:UBC 0 : ereport(ERROR,
3288 : : (errcode(ERRCODE_CONNECTION_FAILURE),
3289 : : errmsg("could not receive list of replicated tables from the publisher: %s",
3290 : : res->err)));
3291 : :
3292 : : /* Process tables. */
2784 andres@anarazel.de 3293 :CBC 164 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3386 peter_e@gmx.net 3294 [ + + ]: 467 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3295 : : {
3296 : : char *nspname;
3297 : : char *relname;
3298 : : bool isnull;
3299 : : char relkind;
250 akapila@postgresql.o 3300 :GNC 304 : PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
3301 : :
3386 peter_e@gmx.net 3302 :CBC 304 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3303 [ - + ]: 304 : Assert(!isnull);
3304 : 304 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
3305 [ - + ]: 304 : Assert(!isnull);
250 akapila@postgresql.o 3306 :GNC 304 : relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
3307 [ - + ]: 304 : Assert(!isnull);
3308 : :
3309 : 304 : relinfo->rv = makeRangeVar(nspname, relname, -1);
3310 : 304 : relinfo->relkind = relkind;
3311 : :
3312 [ + + + - ]: 304 : if (relkind != RELKIND_SEQUENCE &&
3313 [ + + ]: 287 : check_columnlist &&
3314 : 287 : list_member_rangevar(relationlist, relinfo->rv))
1489 akapila@postgresql.o 3315 [ + - ]:CBC 1 : ereport(ERROR,
3316 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3317 : : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
3318 : : nspname, relname));
3319 : : else
250 akapila@postgresql.o 3320 :GNC 303 : relationlist = lappend(relationlist, relinfo);
3321 : :
3386 peter_e@gmx.net 3322 :CBC 303 : ExecClearTuple(slot);
3323 : : }
3324 : 163 : ExecDropSingleTupleTableSlot(slot);
3325 : :
3326 : 163 : walrcv_clear_result(res);
3327 : :
250 akapila@postgresql.o 3328 :GNC 163 : return relationlist;
3329 : : }
3330 : :
3331 : : /*
3332 : : * This is to report the connection failure while dropping replication slots.
3333 : : * Here, we report the WARNING for all tablesync slots so that user can drop
3334 : : * them manually, if required.
3335 : : */
3336 : : static void
1964 akapila@postgresql.o 3337 :GBC 4 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
3338 : : {
3339 : : ListCell *lc;
3340 : :
3341 [ - + - - : 4 : foreach(lc, rstates)
- + ]
3342 : : {
1964 akapila@postgresql.o 3343 :UBC 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
3344 : 0 : Oid relid = rstate->relid;
3345 : :
3346 : : /* Only cleanup resources of tablesync workers */
3347 [ # # ]: 0 : if (!OidIsValid(relid))
3348 : 0 : continue;
3349 : :
3350 : : /*
3351 : : * Caller needs to ensure that relstate doesn't change underneath us.
3352 : : * See DropSubscription where we get the relstates.
3353 : : */
3354 [ # # ]: 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
3355 : : {
3356 : 0 : char syncslotname[NAMEDATALEN] = {0};
3357 : :
1961 3358 : 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
3359 : : sizeof(syncslotname));
1964 3360 [ # # ]: 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
3361 : : syncslotname);
3362 : : }
3363 : : }
3364 : :
1964 akapila@postgresql.o 3365 [ + - ]:GBC 4 : ereport(ERROR,
3366 : : (errcode(ERRCODE_CONNECTION_FAILURE),
3367 : : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3368 : : slotname, err),
3369 : : /* translator: %s is an SQL ALTER command */
3370 : : errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3371 : : "ALTER SUBSCRIPTION ... DISABLE",
3372 : : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3373 : : }
3374 : :
3375 : : /*
3376 : : * Check for duplicates in the given list of publications and error out if
3377 : : * found one. Add publications to datums as text datums, if datums is not
3378 : : * NULL.
3379 : : */
3380 : : static void
1911 peter@eisentraut.org 3381 :CBC 268 : check_duplicates_in_publist(List *publist, Datum *datums)
3382 : : {
3383 : : ListCell *cell;
3384 : 268 : int j = 0;
3385 : :
3386 [ + - + + : 610 : foreach(cell, publist)
+ + ]
3387 : : {
3388 : 354 : char *name = strVal(lfirst(cell));
3389 : : ListCell *pcell;
3390 : :
3391 [ + - + - : 517 : foreach(pcell, publist)
+ - ]
3392 : : {
3393 : 517 : char *pname = strVal(lfirst(pcell));
3394 : :
3395 [ + + ]: 517 : if (pcell == cell)
3396 : 342 : break;
3397 : :
3398 [ + + ]: 175 : if (strcmp(name, pname) == 0)
3399 [ + - ]: 12 : ereport(ERROR,
3400 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
3401 : : errmsg("publication name \"%s\" used more than once",
3402 : : pname)));
3403 : : }
3404 : :
3405 [ + + ]: 342 : if (datums)
3406 : 286 : datums[j++] = CStringGetTextDatum(name);
3407 : : }
3408 : 256 : }
3409 : :
3410 : : /*
3411 : : * Merge current subscription's publications and user-specified publications
3412 : : * from ADD/DROP PUBLICATIONS.
3413 : : *
3414 : : * If addpub is true, we will add the list of publications into oldpublist.
3415 : : * Otherwise, we will delete the list of publications from oldpublist. The
3416 : : * returned list is a copy, oldpublist itself is not changed.
3417 : : *
3418 : : * subname is the subscription name, for error messages.
3419 : : */
3420 : : static List *
3421 : 35 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
3422 : : {
3423 : : ListCell *lc;
3424 : :
3425 : 35 : oldpublist = list_copy(oldpublist);
3426 : :
3427 : 35 : check_duplicates_in_publist(newpublist, NULL);
3428 : :
3429 [ + - + + : 59 : foreach(lc, newpublist)
+ + ]
3430 : : {
3431 : 44 : char *name = strVal(lfirst(lc));
3432 : : ListCell *lc2;
3433 : 44 : bool found = false;
3434 : :
3435 [ + - + + : 86 : foreach(lc2, oldpublist)
+ + ]
3436 : : {
3437 : 71 : char *pubname = strVal(lfirst(lc2));
3438 : :
3439 [ + + ]: 71 : if (strcmp(name, pubname) == 0)
3440 : : {
3441 : 29 : found = true;
3442 [ + + ]: 29 : if (addpub)
3443 [ + - ]: 8 : ereport(ERROR,
3444 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
3445 : : errmsg("publication \"%s\" is already in subscription \"%s\"",
3446 : : name, subname)));
3447 : : else
3448 : 21 : oldpublist = foreach_delete_current(oldpublist, lc2);
3449 : :
3450 : 21 : break;
3451 : : }
3452 : : }
3453 : :
3454 [ + + + - ]: 36 : if (addpub && !found)
3455 : 11 : oldpublist = lappend(oldpublist, makeString(name));
3456 [ + - + + ]: 25 : else if (!addpub && !found)
3457 [ + - ]: 4 : ereport(ERROR,
3458 : : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3459 : : errmsg("publication \"%s\" is not in subscription \"%s\"",
3460 : : name, subname)));
3461 : : }
3462 : :
3463 : : /*
3464 : : * XXX Probably no strong reason for this, but for now it's to make ALTER
3465 : : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
3466 : : */
3467 [ + + ]: 15 : if (!oldpublist)
3468 [ + - ]: 4 : ereport(ERROR,
3469 : : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3470 : : errmsg("cannot drop all the publications from a subscription")));
3471 : :
3472 : 11 : return oldpublist;
3473 : : }
3474 : :
3475 : : /*
3476 : : * Extract the streaming mode value from a DefElem. This is like
3477 : : * defGetBoolean() but also accepts the special value of "parallel".
3478 : : */
3479 : : char
1268 akapila@postgresql.o 3480 : 462 : defGetStreamingMode(DefElem *def)
3481 : : {
3482 : : /*
3483 : : * If no parameter value given, assume "true" is meant.
3484 : : */
3485 [ - + ]: 462 : if (!def->arg)
1268 akapila@postgresql.o 3486 :UBC 0 : return LOGICALREP_STREAM_ON;
3487 : :
3488 : : /*
3489 : : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3490 : : */
1268 akapila@postgresql.o 3491 [ - + ]:CBC 462 : switch (nodeTag(def->arg))
3492 : : {
1268 akapila@postgresql.o 3493 :UBC 0 : case T_Integer:
3494 [ # # # ]: 0 : switch (intVal(def->arg))
3495 : : {
3496 : 0 : case 0:
3497 : 0 : return LOGICALREP_STREAM_OFF;
3498 : 0 : case 1:
3499 : 0 : return LOGICALREP_STREAM_ON;
3500 : 0 : default:
3501 : : /* otherwise, error out below */
3502 : 0 : break;
3503 : : }
3504 : 0 : break;
1268 akapila@postgresql.o 3505 :CBC 462 : default:
3506 : : {
3507 : 462 : char *sval = defGetString(def);
3508 : :
3509 : : /*
3510 : : * The set of strings accepted here should match up with the
3511 : : * grammar's opt_boolean_or_string production.
3512 : : */
3513 [ + + + + ]: 920 : if (pg_strcasecmp(sval, "false") == 0 ||
3514 : 458 : pg_strcasecmp(sval, "off") == 0)
3515 : 7 : return LOGICALREP_STREAM_OFF;
3516 [ + + + + ]: 898 : if (pg_strcasecmp(sval, "true") == 0 ||
3517 : 443 : pg_strcasecmp(sval, "on") == 0)
3518 : 48 : return LOGICALREP_STREAM_ON;
3519 [ + + ]: 407 : if (pg_strcasecmp(sval, "parallel") == 0)
3520 : 403 : return LOGICALREP_STREAM_PARALLEL;
3521 : : }
3522 : 4 : break;
3523 : : }
3524 : :
3525 [ + - ]: 4 : ereport(ERROR,
3526 : : (errcode(ERRCODE_SYNTAX_ERROR),
3527 : : errmsg("%s requires a Boolean value or \"parallel\"",
3528 : : def->defname)));
3529 : : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3530 : : }
|