LCOV - code coverage report
Current view: top level - src/backend/commands - subscriptioncmds.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 88.0 % 1104 972
Test Date: 2026-06-12 21:16:47 Functions: 95.7 % 23 22
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * subscriptioncmds.c
       4              :  *      subscription catalog manipulation functions
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/commands/subscriptioncmds.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/commit_ts.h"
      18              : #include "access/htup_details.h"
      19              : #include "access/table.h"
      20              : #include "access/twophase.h"
      21              : #include "access/xact.h"
      22              : #include "catalog/catalog.h"
      23              : #include "catalog/dependency.h"
      24              : #include "catalog/indexing.h"
      25              : #include "catalog/namespace.h"
      26              : #include "catalog/objectaccess.h"
      27              : #include "catalog/objectaddress.h"
      28              : #include "catalog/pg_authid_d.h"
      29              : #include "catalog/pg_database_d.h"
      30              : #include "catalog/pg_foreign_server.h"
      31              : #include "catalog/pg_subscription.h"
      32              : #include "catalog/pg_subscription_rel.h"
      33              : #include "catalog/pg_type.h"
      34              : #include "catalog/pg_user_mapping.h"
      35              : #include "commands/defrem.h"
      36              : #include "commands/event_trigger.h"
      37              : #include "commands/subscriptioncmds.h"
      38              : #include "executor/executor.h"
      39              : #include "foreign/foreign.h"
      40              : #include "miscadmin.h"
      41              : #include "nodes/makefuncs.h"
      42              : #include "pgstat.h"
      43              : #include "replication/logicallauncher.h"
      44              : #include "replication/logicalworker.h"
      45              : #include "replication/origin.h"
      46              : #include "replication/slot.h"
      47              : #include "replication/walreceiver.h"
      48              : #include "replication/walsender.h"
      49              : #include "replication/worker_internal.h"
      50              : #include "storage/lmgr.h"
      51              : #include "utils/acl.h"
      52              : #include "utils/builtins.h"
      53              : #include "utils/guc.h"
      54              : #include "utils/lsyscache.h"
      55              : #include "utils/memutils.h"
      56              : #include "utils/pg_lsn.h"
      57              : #include "utils/syscache.h"
      58              : 
      59              : /*
      60              :  * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
      61              :  * command.
      62              :  */
      63              : #define SUBOPT_CONNECT              0x00000001
      64              : #define SUBOPT_ENABLED              0x00000002
      65              : #define SUBOPT_CREATE_SLOT          0x00000004
      66              : #define SUBOPT_SLOT_NAME            0x00000008
      67              : #define SUBOPT_COPY_DATA            0x00000010
      68              : #define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020
      69              : #define SUBOPT_REFRESH              0x00000040
      70              : #define SUBOPT_BINARY               0x00000080
      71              : #define SUBOPT_STREAMING            0x00000100
      72              : #define SUBOPT_TWOPHASE_COMMIT      0x00000200
      73              : #define SUBOPT_DISABLE_ON_ERR       0x00000400
      74              : #define SUBOPT_PASSWORD_REQUIRED    0x00000800
      75              : #define SUBOPT_RUN_AS_OWNER         0x00001000
      76              : #define SUBOPT_FAILOVER             0x00002000
      77              : #define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000
      78              : #define SUBOPT_MAX_RETENTION_DURATION   0x00008000
      79              : #define SUBOPT_WAL_RECEIVER_TIMEOUT         0x00010000
      80              : #define SUBOPT_LSN                  0x00020000
      81              : #define SUBOPT_ORIGIN               0x00040000
      82              : 
      83              : /* check if the 'val' has 'bits' set */
      84              : #define IsSet(val, bits)  (((val) & (bits)) == (bits))
      85              : 
      86              : /*
      87              :  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
      88              :  * SUBSCRIPTION command options and the parsed/default values of each of them.
      89              :  */
      90              : typedef struct SubOpts
      91              : {
      92              :     uint32      specified_opts;
      93              :     char       *slot_name;
      94              :     char       *synchronous_commit;
      95              :     bool        connect;
      96              :     bool        enabled;
      97              :     bool        create_slot;
      98              :     bool        copy_data;
      99              :     bool        refresh;
     100              :     bool        binary;
     101              :     char        streaming;
     102              :     bool        twophase;
     103              :     bool        disableonerr;
     104              :     bool        passwordrequired;
     105              :     bool        runasowner;
     106              :     bool        failover;
     107              :     bool        retaindeadtuples;
     108              :     int32       maxretention;
     109              :     char       *origin;
     110              :     XLogRecPtr  lsn;
     111              :     char       *wal_receiver_timeout;
     112              : } SubOpts;
     113              : 
     114              : /*
     115              :  * PublicationRelKind represents a relation included in a publication.
     116              :  * It stores the schema-qualified relation name (rv) and its kind (relkind).
     117              :  */
     118              : typedef struct PublicationRelKind
     119              : {
     120              :     RangeVar   *rv;
     121              :     char        relkind;
     122              : } PublicationRelKind;
     123              : 
     124              : static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
     125              : static void check_publications_origin_tables(WalReceiverConn *wrconn,
     126              :                                              List *publications, bool copydata,
     127              :                                              bool retain_dead_tuples,
     128              :                                              char *origin,
     129              :                                              Oid *subrel_local_oids,
     130              :                                              int subrel_count, char *subname);
     131              : static void check_publications_origin_sequences(WalReceiverConn *wrconn,
     132              :                                                 List *publications,
     133              :                                                 bool copydata, char *origin,
     134              :                                                 Oid *subrel_local_oids,
     135              :                                                 int subrel_count,
     136              :                                                 char *subname);
     137              : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
     138              : static void check_duplicates_in_publist(List *publist, Datum *datums);
     139              : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
     140              : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
     141              : static void CheckAlterSubOption(Subscription *sub, const char *option,
     142              :                                 bool slot_needs_update, bool isTopLevel);
     143              : 
     144              : 
     145              : /*
     146              :  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
     147              :  *
     148              :  * Since not all options can be specified in both commands, this function
     149              :  * will report an error if mutually exclusive options are specified.
     150              :  */
     151              : static void
     152          624 : parse_subscription_options(ParseState *pstate, List *stmt_options,
     153              :                            uint32 supported_opts, SubOpts *opts)
     154              : {
     155              :     ListCell   *lc;
     156              : 
     157              :     /* Start out with cleared opts. */
     158          624 :     memset(opts, 0, sizeof(SubOpts));
     159              : 
     160              :     /* caller must expect some option */
     161              :     Assert(supported_opts != 0);
     162              : 
     163              :     /* If connect option is supported, these others also need to be. */
     164              :     Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
     165              :            IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     166              :                  SUBOPT_COPY_DATA));
     167              : 
     168              :     /* Set default values for the supported options. */
     169          624 :     if (IsSet(supported_opts, SUBOPT_CONNECT))
     170          308 :         opts->connect = true;
     171          624 :     if (IsSet(supported_opts, SUBOPT_ENABLED))
     172          366 :         opts->enabled = true;
     173          624 :     if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
     174          308 :         opts->create_slot = true;
     175          624 :     if (IsSet(supported_opts, SUBOPT_COPY_DATA))
     176          395 :         opts->copy_data = true;
     177          624 :     if (IsSet(supported_opts, SUBOPT_REFRESH))
     178           54 :         opts->refresh = true;
     179          624 :     if (IsSet(supported_opts, SUBOPT_BINARY))
     180          464 :         opts->binary = false;
     181          624 :     if (IsSet(supported_opts, SUBOPT_STREAMING))
     182          464 :         opts->streaming = LOGICALREP_STREAM_PARALLEL;
     183          624 :     if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
     184          464 :         opts->twophase = false;
     185          624 :     if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
     186          464 :         opts->disableonerr = false;
     187          624 :     if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
     188          464 :         opts->passwordrequired = true;
     189          624 :     if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
     190          464 :         opts->runasowner = false;
     191          624 :     if (IsSet(supported_opts, SUBOPT_FAILOVER))
     192          464 :         opts->failover = false;
     193          624 :     if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     194          464 :         opts->retaindeadtuples = false;
     195          624 :     if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
     196          464 :         opts->maxretention = 0;
     197          624 :     if (IsSet(supported_opts, SUBOPT_ORIGIN))
     198          464 :         opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
     199              : 
     200              :     /* Parse options */
     201         1257 :     foreach(lc, stmt_options)
     202              :     {
     203          689 :         DefElem    *defel = (DefElem *) lfirst(lc);
     204              : 
     205          689 :         if (IsSet(supported_opts, SUBOPT_CONNECT) &&
     206          409 :             strcmp(defel->defname, "connect") == 0)
     207              :         {
     208          145 :             if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
     209            0 :                 errorConflictingDefElem(defel, pstate);
     210              : 
     211          145 :             opts->specified_opts |= SUBOPT_CONNECT;
     212          145 :             opts->connect = defGetBoolean(defel);
     213              :         }
     214          544 :         else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
     215          322 :                  strcmp(defel->defname, "enabled") == 0)
     216              :         {
     217           82 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     218            0 :                 errorConflictingDefElem(defel, pstate);
     219              : 
     220           82 :             opts->specified_opts |= SUBOPT_ENABLED;
     221           82 :             opts->enabled = defGetBoolean(defel);
     222              :         }
     223          462 :         else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
     224          240 :                  strcmp(defel->defname, "create_slot") == 0)
     225              :         {
     226           25 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     227            0 :                 errorConflictingDefElem(defel, pstate);
     228              : 
     229           25 :             opts->specified_opts |= SUBOPT_CREATE_SLOT;
     230           25 :             opts->create_slot = defGetBoolean(defel);
     231              :         }
     232          437 :         else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
     233          373 :                  strcmp(defel->defname, "slot_name") == 0)
     234              :         {
     235          121 :             if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     236            0 :                 errorConflictingDefElem(defel, pstate);
     237              : 
     238          121 :             opts->specified_opts |= SUBOPT_SLOT_NAME;
     239          121 :             opts->slot_name = defGetString(defel);
     240              : 
     241              :             /* Setting slot_name = NONE is treated as no slot name. */
     242          238 :             if (strcmp(opts->slot_name, "none") == 0)
     243           97 :                 opts->slot_name = NULL;
     244              :             else
     245           24 :                 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
     246              :         }
     247          316 :         else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
     248          198 :                  strcmp(defel->defname, "copy_data") == 0)
     249              :         {
     250           31 :             if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     251            0 :                 errorConflictingDefElem(defel, pstate);
     252              : 
     253           31 :             opts->specified_opts |= SUBOPT_COPY_DATA;
     254           31 :             opts->copy_data = defGetBoolean(defel);
     255              :         }
     256          285 :         else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
     257          226 :                  strcmp(defel->defname, "synchronous_commit") == 0)
     258              :         {
     259            8 :             if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
     260            0 :                 errorConflictingDefElem(defel, pstate);
     261              : 
     262            8 :             opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
     263            8 :             opts->synchronous_commit = defGetString(defel);
     264              : 
     265              :             /* Test if the given value is valid for synchronous_commit GUC. */
     266            8 :             (void) set_config_option("synchronous_commit", opts->synchronous_commit,
     267              :                                      PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     268              :                                      false, 0, false);
     269              :         }
     270          277 :         else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
     271           44 :                  strcmp(defel->defname, "refresh") == 0)
     272              :         {
     273           44 :             if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
     274            0 :                 errorConflictingDefElem(defel, pstate);
     275              : 
     276           44 :             opts->specified_opts |= SUBOPT_REFRESH;
     277           44 :             opts->refresh = defGetBoolean(defel);
     278              :         }
     279          233 :         else if (IsSet(supported_opts, SUBOPT_BINARY) &&
     280          218 :                  strcmp(defel->defname, "binary") == 0)
     281              :         {
     282           19 :             if (IsSet(opts->specified_opts, SUBOPT_BINARY))
     283            0 :                 errorConflictingDefElem(defel, pstate);
     284              : 
     285           19 :             opts->specified_opts |= SUBOPT_BINARY;
     286           19 :             opts->binary = defGetBoolean(defel);
     287              :         }
     288          214 :         else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
     289          199 :                  strcmp(defel->defname, "streaming") == 0)
     290              :         {
     291           43 :             if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
     292            0 :                 errorConflictingDefElem(defel, pstate);
     293              : 
     294           43 :             opts->specified_opts |= SUBOPT_STREAMING;
     295           43 :             opts->streaming = defGetStreamingMode(defel);
     296              :         }
     297          171 :         else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
     298          156 :                  strcmp(defel->defname, "two_phase") == 0)
     299              :         {
     300           24 :             if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
     301            0 :                 errorConflictingDefElem(defel, pstate);
     302              : 
     303           24 :             opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
     304           24 :             opts->twophase = defGetBoolean(defel);
     305              :         }
     306          147 :         else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
     307          132 :                  strcmp(defel->defname, "disable_on_error") == 0)
     308              :         {
     309           13 :             if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
     310            0 :                 errorConflictingDefElem(defel, pstate);
     311              : 
     312           13 :             opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
     313           13 :             opts->disableonerr = defGetBoolean(defel);
     314              :         }
     315          134 :         else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
     316          119 :                  strcmp(defel->defname, "password_required") == 0)
     317              :         {
     318           16 :             if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
     319            0 :                 errorConflictingDefElem(defel, pstate);
     320              : 
     321           16 :             opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
     322           16 :             opts->passwordrequired = defGetBoolean(defel);
     323              :         }
     324          118 :         else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
     325          103 :                  strcmp(defel->defname, "run_as_owner") == 0)
     326              :         {
     327           11 :             if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
     328            0 :                 errorConflictingDefElem(defel, pstate);
     329              : 
     330           11 :             opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
     331           11 :             opts->runasowner = defGetBoolean(defel);
     332              :         }
     333          107 :         else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
     334           92 :                  strcmp(defel->defname, "failover") == 0)
     335              :         {
     336           16 :             if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
     337            0 :                 errorConflictingDefElem(defel, pstate);
     338              : 
     339           16 :             opts->specified_opts |= SUBOPT_FAILOVER;
     340           16 :             opts->failover = defGetBoolean(defel);
     341              :         }
     342           91 :         else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
     343           76 :                  strcmp(defel->defname, "retain_dead_tuples") == 0)
     344              :         {
     345           14 :             if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     346            0 :                 errorConflictingDefElem(defel, pstate);
     347              : 
     348           14 :             opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
     349           14 :             opts->retaindeadtuples = defGetBoolean(defel);
     350              :         }
     351           77 :         else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
     352           62 :                  strcmp(defel->defname, "max_retention_duration") == 0)
     353              :         {
     354           22 :             if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
     355            0 :                 errorConflictingDefElem(defel, pstate);
     356              : 
     357           22 :             opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
     358           22 :             opts->maxretention = defGetInt32(defel);
     359              : 
     360           18 :             if (opts->maxretention < 0)
     361            8 :                 ereport(ERROR,
     362              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     363              :                         errmsg("max_retention_duration cannot be negative"));
     364              :         }
     365           55 :         else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
     366           40 :                  strcmp(defel->defname, "origin") == 0)
     367              :         {
     368           24 :             if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
     369            0 :                 errorConflictingDefElem(defel, pstate);
     370              : 
     371           24 :             opts->specified_opts |= SUBOPT_ORIGIN;
     372           24 :             pfree(opts->origin);
     373              : 
     374              :             /*
     375              :              * Even though the "origin" parameter allows only "none" and "any"
     376              :              * values, it is implemented as a string type so that the
     377              :              * parameter can be extended in future versions to support
     378              :              * filtering using origin names specified by the user.
     379              :              */
     380           24 :             opts->origin = defGetString(defel);
     381              : 
     382           34 :             if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
     383           10 :                 (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
     384            4 :                 ereport(ERROR,
     385              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     386              :                         errmsg("unrecognized origin value: \"%s\"", opts->origin));
     387              :         }
     388           31 :         else if (IsSet(supported_opts, SUBOPT_LSN) &&
     389           15 :                  strcmp(defel->defname, "lsn") == 0)
     390           11 :         {
     391           15 :             char       *lsn_str = defGetString(defel);
     392              :             XLogRecPtr  lsn;
     393              : 
     394           15 :             if (IsSet(opts->specified_opts, SUBOPT_LSN))
     395            0 :                 errorConflictingDefElem(defel, pstate);
     396              : 
     397              :             /* Setting lsn = NONE is treated as resetting LSN */
     398           15 :             if (strcmp(lsn_str, "none") == 0)
     399            4 :                 lsn = InvalidXLogRecPtr;
     400              :             else
     401              :             {
     402              :                 /* Parse the argument as LSN */
     403           11 :                 lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
     404              :                                                       CStringGetDatum(lsn_str)));
     405              : 
     406           11 :                 if (!XLogRecPtrIsValid(lsn))
     407            4 :                     ereport(ERROR,
     408              :                             (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     409              :                              errmsg("invalid WAL location (LSN): %s", lsn_str)));
     410              :             }
     411              : 
     412           11 :             opts->specified_opts |= SUBOPT_LSN;
     413           11 :             opts->lsn = lsn;
     414              :         }
     415           16 :         else if (IsSet(supported_opts, SUBOPT_WAL_RECEIVER_TIMEOUT) &&
     416           16 :                  strcmp(defel->defname, "wal_receiver_timeout") == 0)
     417            8 :         {
     418              :             bool        parsed;
     419              :             int         val;
     420              : 
     421           12 :             if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
     422            0 :                 errorConflictingDefElem(defel, pstate);
     423              : 
     424           12 :             opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
     425           12 :             opts->wal_receiver_timeout = defGetString(defel);
     426              : 
     427              :             /*
     428              :              * Test if the given value is valid for wal_receiver_timeout GUC.
     429              :              * Skip this test if the value is -1, since -1 is allowed for the
     430              :              * wal_receiver_timeout subscription option, but not for the GUC
     431              :              * itself.
     432              :              */
     433           12 :             parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
     434           12 :             if (!parsed || val != -1)
     435            8 :                 (void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
     436              :                                          PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     437              :                                          false, 0, false);
     438              :         }
     439              :         else
     440            4 :             ereport(ERROR,
     441              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     442              :                      errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
     443              :     }
     444              : 
     445              :     /*
     446              :      * We've been explicitly asked to not connect, that requires some
     447              :      * additional processing.
     448              :      */
     449          568 :     if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
     450              :     {
     451              :         /* Check for incompatible options from the user. */
     452          113 :         if (opts->enabled &&
     453          113 :             IsSet(opts->specified_opts, SUBOPT_ENABLED))
     454            4 :             ereport(ERROR,
     455              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     456              :             /*- translator: both %s are strings of the form "option = value" */
     457              :                      errmsg("%s and %s are mutually exclusive options",
     458              :                             "connect = false", "enabled = true")));
     459              : 
     460          109 :         if (opts->create_slot &&
     461          105 :             IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     462            4 :             ereport(ERROR,
     463              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     464              :                      errmsg("%s and %s are mutually exclusive options",
     465              :                             "connect = false", "create_slot = true")));
     466              : 
     467          105 :         if (opts->copy_data &&
     468          101 :             IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     469            4 :             ereport(ERROR,
     470              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     471              :                      errmsg("%s and %s are mutually exclusive options",
     472              :                             "connect = false", "copy_data = true")));
     473              : 
     474              :         /* Change the defaults of other options. */
     475          101 :         opts->enabled = false;
     476          101 :         opts->create_slot = false;
     477          101 :         opts->copy_data = false;
     478              :     }
     479              : 
     480              :     /*
     481              :      * Do additional checking for disallowed combination when slot_name = NONE
     482              :      * was used.
     483              :      */
     484          556 :     if (!opts->slot_name &&
     485          536 :         IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     486              :     {
     487           93 :         if (opts->enabled)
     488              :         {
     489           12 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     490            4 :                 ereport(ERROR,
     491              :                         (errcode(ERRCODE_SYNTAX_ERROR),
     492              :                 /*- translator: both %s are strings of the form "option = value" */
     493              :                          errmsg("%s and %s are mutually exclusive options",
     494              :                                 "slot_name = NONE", "enabled = true")));
     495              :             else
     496            8 :                 ereport(ERROR,
     497              :                         (errcode(ERRCODE_SYNTAX_ERROR),
     498              :                 /*- translator: both %s are strings of the form "option = value" */
     499              :                          errmsg("subscription with %s must also set %s",
     500              :                                 "slot_name = NONE", "enabled = false")));
     501              :         }
     502              : 
     503           81 :         if (opts->create_slot)
     504              :         {
     505            8 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     506            4 :                 ereport(ERROR,
     507              :                         (errcode(ERRCODE_SYNTAX_ERROR),
     508              :                 /*- translator: both %s are strings of the form "option = value" */
     509              :                          errmsg("%s and %s are mutually exclusive options",
     510              :                                 "slot_name = NONE", "create_slot = true")));
     511              :             else
     512            4 :                 ereport(ERROR,
     513              :                         (errcode(ERRCODE_SYNTAX_ERROR),
     514              :                 /*- translator: both %s are strings of the form "option = value" */
     515              :                          errmsg("subscription with %s must also set %s",
     516              :                                 "slot_name = NONE", "create_slot = false")));
     517              :         }
     518              :     }
     519          536 : }
     520              : 
     521              : /*
     522              :  * Check that the specified publications are present on the publisher.
     523              :  */
     524              : static void
     525          135 : check_publications(WalReceiverConn *wrconn, List *publications)
     526              : {
     527              :     WalRcvExecResult *res;
     528              :     StringInfoData cmd;
     529              :     TupleTableSlot *slot;
     530          135 :     List       *publicationsCopy = NIL;
     531          135 :     Oid         tableRow[1] = {TEXTOID};
     532              : 
     533          135 :     initStringInfo(&cmd);
     534          135 :     appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
     535              :                            " pg_catalog.pg_publication t WHERE\n"
     536              :                            " t.pubname IN (");
     537          135 :     GetPublicationsStr(publications, &cmd, true);
     538          135 :     appendStringInfoChar(&cmd, ')');
     539              : 
     540          135 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
     541          135 :     pfree(cmd.data);
     542              : 
     543          135 :     if (res->status != WALRCV_OK_TUPLES)
     544            0 :         ereport(ERROR,
     545              :                 errmsg("could not receive list of publications from the publisher: %s",
     546              :                        res->err));
     547              : 
     548          135 :     publicationsCopy = list_copy(publications);
     549              : 
     550              :     /* Process publication(s). */
     551          135 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     552          299 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     553              :     {
     554              :         char       *pubname;
     555              :         bool        isnull;
     556              : 
     557          164 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     558              :         Assert(!isnull);
     559              : 
     560              :         /* Delete the publication present in publisher from the list. */
     561          164 :         publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
     562          164 :         ExecClearTuple(slot);
     563              :     }
     564              : 
     565          135 :     ExecDropSingleTupleTableSlot(slot);
     566              : 
     567          135 :     walrcv_clear_result(res);
     568              : 
     569          135 :     if (list_length(publicationsCopy))
     570              :     {
     571              :         /* Prepare the list of non-existent publication(s) for error message. */
     572              :         StringInfoData pubnames;
     573              : 
     574            4 :         initStringInfo(&pubnames);
     575              : 
     576            4 :         GetPublicationsStr(publicationsCopy, &pubnames, false);
     577            4 :         ereport(WARNING,
     578              :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     579              :                 errmsg_plural("publication %s does not exist on the publisher",
     580              :                               "publications %s do not exist on the publisher",
     581              :                               list_length(publicationsCopy),
     582              :                               pubnames.data));
     583              :     }
     584          135 : }
     585              : 
     586              : /*
     587              :  * Auxiliary function to build a text array out of a list of String nodes.
     588              :  */
     589              : static Datum
     590          229 : publicationListToArray(List *publist)
     591              : {
     592              :     ArrayType  *arr;
     593              :     Datum      *datums;
     594              :     MemoryContext memcxt;
     595              :     MemoryContext oldcxt;
     596              : 
     597              :     /* Create memory context for temporary allocations. */
     598          229 :     memcxt = AllocSetContextCreate(CurrentMemoryContext,
     599              :                                    "publicationListToArray to array",
     600              :                                    ALLOCSET_DEFAULT_SIZES);
     601          229 :     oldcxt = MemoryContextSwitchTo(memcxt);
     602              : 
     603          229 :     datums = palloc_array(Datum, list_length(publist));
     604              : 
     605          229 :     check_duplicates_in_publist(publist, datums);
     606              : 
     607          225 :     MemoryContextSwitchTo(oldcxt);
     608              : 
     609          225 :     arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
     610              : 
     611          225 :     MemoryContextDelete(memcxt);
     612              : 
     613          225 :     return PointerGetDatum(arr);
     614              : }
     615              : 
     616              : /*
     617              :  * Create new subscription.
     618              :  */
     619              : ObjectAddress
     620          308 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
     621              :                    bool isTopLevel)
     622              : {
     623              :     Relation    rel;
     624              :     ObjectAddress myself;
     625              :     Oid         subid;
     626              :     bool        nulls[Natts_pg_subscription];
     627              :     Datum       values[Natts_pg_subscription];
     628          308 :     Oid         owner = GetUserId();
     629              :     HeapTuple   tup;
     630              :     Oid         serverid;
     631              :     char       *conninfo;
     632              :     char        originname[NAMEDATALEN];
     633              :     List       *publications;
     634              :     uint32      supported_opts;
     635          308 :     SubOpts     opts = {0};
     636              :     AclResult   aclresult;
     637              : 
     638              :     /*
     639              :      * Parse and check options.
     640              :      *
     641              :      * Connection and publication should not be specified here.
     642              :      */
     643          308 :     supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     644              :                       SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
     645              :                       SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
     646              :                       SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
     647              :                       SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
     648              :                       SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
     649              :                       SUBOPT_RETAIN_DEAD_TUPLES |
     650              :                       SUBOPT_MAX_RETENTION_DURATION |
     651              :                       SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN);
     652          308 :     parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
     653              : 
     654              :     /*
     655              :      * Since creating a replication slot is not transactional, rolling back
     656              :      * the transaction leaves the created replication slot.  So we cannot run
     657              :      * CREATE SUBSCRIPTION inside a transaction block if creating a
     658              :      * replication slot.
     659              :      */
     660          244 :     if (opts.create_slot)
     661          138 :         PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
     662              : 
     663              :     /*
     664              :      * We don't want to allow unprivileged users to be able to trigger
     665              :      * attempts to access arbitrary network destinations, so require the user
     666              :      * to have been specifically authorized to create subscriptions.
     667              :      */
     668          240 :     if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
     669            4 :         ereport(ERROR,
     670              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     671              :                  errmsg("permission denied to create subscription"),
     672              :                  errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
     673              :                            "pg_create_subscription")));
     674              : 
     675              :     /*
     676              :      * Since a subscription is a database object, we also check for CREATE
     677              :      * permission on the database.
     678              :      */
     679          236 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
     680              :                                 owner, ACL_CREATE);
     681          236 :     if (aclresult != ACLCHECK_OK)
     682            8 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     683            4 :                        get_database_name(MyDatabaseId));
     684              : 
     685              :     /*
     686              :      * Non-superusers are required to set a password for authentication, and
     687              :      * that password must be used by the target server, but the superuser can
     688              :      * exempt a subscription from this requirement.
     689              :      */
     690          232 :     if (!opts.passwordrequired && !superuser_arg(owner))
     691            4 :         ereport(ERROR,
     692              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     693              :                  errmsg("password_required=false is superuser-only"),
     694              :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
     695              : 
     696              :     /*
     697              :      * If built with appropriate switch, whine when regression-testing
     698              :      * conventions for subscription names are violated.
     699              :      */
     700              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
     701              :     if (strncmp(stmt->subname, "regress_", 8) != 0)
     702              :         elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
     703              : #endif
     704              : 
     705          228 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     706              : 
     707              :     /* Check if name is used */
     708          228 :     subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
     709              :                             ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
     710          228 :     if (OidIsValid(subid))
     711              :     {
     712            5 :         ereport(ERROR,
     713              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     714              :                  errmsg("subscription \"%s\" already exists",
     715              :                         stmt->subname)));
     716              :     }
     717              : 
     718              :     /*
     719              :      * Ensure that system configuration parameters are set appropriately to
     720              :      * support retain_dead_tuples and max_retention_duration.
     721              :      */
     722          223 :     CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
     723          223 :                                opts.retaindeadtuples, opts.retaindeadtuples,
     724          223 :                                (opts.maxretention > 0));
     725              : 
     726          223 :     if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
     727          181 :         opts.slot_name == NULL)
     728          181 :         opts.slot_name = stmt->subname;
     729              : 
     730              :     /* The default for synchronous_commit of subscriptions is off. */
     731          223 :     if (opts.synchronous_commit == NULL)
     732          223 :         opts.synchronous_commit = "off";
     733              : 
     734              :     /*
     735              :      * The default for wal_receiver_timeout of subscriptions is -1, which
     736              :      * means the value is inherited from the server configuration, command
     737              :      * line, or role/database settings.
     738              :      */
     739          223 :     if (opts.wal_receiver_timeout == NULL)
     740          223 :         opts.wal_receiver_timeout = "-1";
     741              : 
     742              :     /* Load the library providing us libpq calls. */
     743          223 :     load_file("libpqwalreceiver", false);
     744              : 
     745          223 :     if (stmt->servername)
     746              :     {
     747              :         ForeignServer *server;
     748              : 
     749              :         Assert(!stmt->conninfo);
     750           18 :         conninfo = NULL;
     751              : 
     752           18 :         server = GetForeignServerByName(stmt->servername, false);
     753           18 :         aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
     754           18 :         if (aclresult != ACLCHECK_OK)
     755            4 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
     756              : 
     757              :         /* make sure a user mapping exists */
     758           14 :         GetUserMapping(owner, server->serverid);
     759              : 
     760           10 :         serverid = server->serverid;
     761           10 :         conninfo = ForeignServerConnectionString(owner, server);
     762              :     }
     763              :     else
     764              :     {
     765              :         Assert(stmt->conninfo);
     766              : 
     767          205 :         serverid = InvalidOid;
     768          205 :         conninfo = stmt->conninfo;
     769              :     }
     770              : 
     771              :     /* Check the connection info string. */
     772          211 :     walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
     773              : 
     774          199 :     publications = stmt->publication;
     775              : 
     776              :     /* Everything ok, form a new tuple. */
     777          199 :     memset(values, 0, sizeof(values));
     778          199 :     memset(nulls, false, sizeof(nulls));
     779              : 
     780          199 :     subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
     781              :                                Anum_pg_subscription_oid);
     782          199 :     values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
     783          199 :     values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
     784          199 :     values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
     785          199 :     values[Anum_pg_subscription_subname - 1] =
     786          199 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
     787          199 :     values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
     788          199 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
     789          199 :     values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
     790          199 :     values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
     791          199 :     values[Anum_pg_subscription_subtwophasestate - 1] =
     792          199 :         CharGetDatum(opts.twophase ?
     793              :                      LOGICALREP_TWOPHASE_STATE_PENDING :
     794              :                      LOGICALREP_TWOPHASE_STATE_DISABLED);
     795          199 :     values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
     796          199 :     values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
     797          199 :     values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
     798          199 :     values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
     799          199 :     values[Anum_pg_subscription_subretaindeadtuples - 1] =
     800          199 :         BoolGetDatum(opts.retaindeadtuples);
     801          199 :     values[Anum_pg_subscription_submaxretention - 1] =
     802          199 :         Int32GetDatum(opts.maxretention);
     803          199 :     values[Anum_pg_subscription_subretentionactive - 1] =
     804          199 :         BoolGetDatum(opts.retaindeadtuples);
     805          199 :     values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(serverid);
     806          199 :     if (!OidIsValid(serverid))
     807          193 :         values[Anum_pg_subscription_subconninfo - 1] =
     808          193 :             CStringGetTextDatum(conninfo);
     809              :     else
     810            6 :         nulls[Anum_pg_subscription_subconninfo - 1] = true;
     811          199 :     if (opts.slot_name)
     812          185 :         values[Anum_pg_subscription_subslotname - 1] =
     813          185 :             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
     814              :     else
     815           14 :         nulls[Anum_pg_subscription_subslotname - 1] = true;
     816          199 :     values[Anum_pg_subscription_subsynccommit - 1] =
     817          199 :         CStringGetTextDatum(opts.synchronous_commit);
     818          199 :     values[Anum_pg_subscription_subwalrcvtimeout - 1] =
     819          199 :         CStringGetTextDatum(opts.wal_receiver_timeout);
     820          195 :     values[Anum_pg_subscription_subpublications - 1] =
     821          199 :         publicationListToArray(publications);
     822          195 :     values[Anum_pg_subscription_suborigin - 1] =
     823          195 :         CStringGetTextDatum(opts.origin);
     824              : 
     825          195 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     826              : 
     827              :     /* Insert tuple into catalog. */
     828          195 :     CatalogTupleInsert(rel, tup);
     829          195 :     heap_freetuple(tup);
     830              : 
     831          195 :     recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
     832              : 
     833          195 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     834              : 
     835          195 :     if (stmt->servername)
     836              :     {
     837              :         ObjectAddress referenced;
     838              : 
     839              :         Assert(OidIsValid(serverid));
     840              : 
     841            6 :         ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
     842            6 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     843              :     }
     844              : 
     845              :     /*
     846              :      * A replication origin is currently created for all subscriptions,
     847              :      * including those that only contain sequences or are otherwise empty.
     848              :      *
     849              :      * XXX: While this is technically unnecessary, optimizing it would require
     850              :      * additional logic to skip origin creation during DDL operations and
     851              :      * apply workers initialization, and to handle origin creation dynamically
     852              :      * when tables are added to the subscription. It is not clear whether
     853              :      * preventing creation of origins is worth additional complexity.
     854              :      */
     855          195 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
     856          195 :     replorigin_create(originname);
     857              : 
     858              :     /*
     859              :      * Connect to remote side to execute requested commands and fetch table
     860              :      * and sequence info.
     861              :      */
     862          195 :     if (opts.connect)
     863              :     {
     864              :         char       *err;
     865              :         WalReceiverConn *wrconn;
     866              :         bool        must_use_password;
     867              : 
     868              :         /* Try to connect to the publisher. */
     869          130 :         must_use_password = !superuser_arg(owner) && opts.passwordrequired;
     870          130 :         wrconn = walrcv_connect(conninfo, true, true, must_use_password,
     871              :                                 stmt->subname, &err);
     872          130 :         if (!wrconn)
     873            4 :             ereport(ERROR,
     874              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     875              :                      errmsg("subscription \"%s\" could not connect to the publisher: %s",
     876              :                             stmt->subname, err)));
     877              : 
     878          126 :         PG_TRY();
     879              :         {
     880          126 :             bool        has_tables = false;
     881              :             List       *pubrels;
     882              :             char        relation_state;
     883              : 
     884          126 :             check_publications(wrconn, publications);
     885          126 :             check_publications_origin_tables(wrconn, publications,
     886          126 :                                              opts.copy_data,
     887          126 :                                              opts.retaindeadtuples, opts.origin,
     888              :                                              NULL, 0, stmt->subname);
     889          126 :             check_publications_origin_sequences(wrconn, publications,
     890          126 :                                                 opts.copy_data, opts.origin,
     891              :                                                 NULL, 0, stmt->subname);
     892              : 
     893          126 :             if (opts.retaindeadtuples)
     894            3 :                 check_pub_dead_tuple_retention(wrconn);
     895              : 
     896              :             /*
     897              :              * Set sync state based on if we were asked to do data copy or
     898              :              * not.
     899              :              */
     900          126 :             relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
     901              : 
     902              :             /*
     903              :              * Build local relation status info. Relations are for both tables
     904              :              * and sequences from the publisher.
     905              :              */
     906          126 :             pubrels = fetch_relation_list(wrconn, publications);
     907              : 
     908          445 :             foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
     909              :             {
     910              :                 Oid         relid;
     911              :                 char        relkind;
     912          195 :                 RangeVar   *rv = pubrelinfo->rv;
     913              : 
     914          195 :                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
     915          195 :                 relkind = get_rel_relkind(relid);
     916              : 
     917              :                 /* Check for supported relkind. */
     918          195 :                 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
     919          195 :                                          rv->schemaname, rv->relname);
     920          195 :                 has_tables |= (relkind != RELKIND_SEQUENCE);
     921          195 :                 AddSubscriptionRelState(subid, relid, relation_state,
     922              :                                         InvalidXLogRecPtr, true);
     923              :             }
     924              : 
     925              :             /*
     926              :              * If requested, create permanent slot for the subscription. We
     927              :              * won't use the initial snapshot for anything, so no need to
     928              :              * export it.
     929              :              *
     930              :              * XXX: Similar to origins, it is not clear whether preventing the
     931              :              * slot creation for empty and sequence-only subscriptions is
     932              :              * worth additional complexity.
     933              :              */
     934          125 :             if (opts.create_slot)
     935              :             {
     936          120 :                 bool        twophase_enabled = false;
     937              : 
     938              :                 Assert(opts.slot_name);
     939              : 
     940              :                 /*
     941              :                  * Even if two_phase is set, don't create the slot with
     942              :                  * two-phase enabled. Will enable it once all the tables are
     943              :                  * synced and ready. This avoids race-conditions like prepared
     944              :                  * transactions being skipped due to changes not being applied
     945              :                  * due to checks in should_apply_changes_for_rel() when
     946              :                  * tablesync for the corresponding tables are in progress. See
     947              :                  * comments atop worker.c.
     948              :                  *
     949              :                  * Note that if tables were specified but copy_data is false
     950              :                  * then it is safe to enable two_phase up-front because those
     951              :                  * tables are already initially in READY state. When the
     952              :                  * subscription has no tables, we leave the twophase state as
     953              :                  * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
     954              :                  * PUBLICATION to work.
     955              :                  */
     956          120 :                 if (opts.twophase && !opts.copy_data && has_tables)
     957            1 :                     twophase_enabled = true;
     958              : 
     959          120 :                 walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
     960              :                                    opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
     961              : 
     962          120 :                 if (twophase_enabled)
     963            1 :                     UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
     964              : 
     965          120 :                 ereport(NOTICE,
     966              :                         (errmsg("created replication slot \"%s\" on publisher",
     967              :                                 opts.slot_name)));
     968              :             }
     969              :         }
     970            1 :         PG_FINALLY();
     971              :         {
     972          126 :             walrcv_disconnect(wrconn);
     973              :         }
     974          126 :         PG_END_TRY();
     975              :     }
     976              :     else
     977           65 :         ereport(WARNING,
     978              :                 (errmsg("subscription was created, but is not connected"),
     979              :                  errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
     980              : 
     981          190 :     table_close(rel, RowExclusiveLock);
     982              : 
     983          190 :     pgstat_create_subscription(subid);
     984              : 
     985              :     /*
     986              :      * Notify the launcher to start the apply worker if the subscription is
     987              :      * enabled, or to create the conflict detection slot if retain_dead_tuples
     988              :      * is enabled.
     989              :      *
     990              :      * Creating the conflict detection slot is essential even when the
     991              :      * subscription is not enabled. This ensures that dead tuples are
     992              :      * retained, which is necessary for accurately identifying the type of
     993              :      * conflict during replication.
     994              :      */
     995          190 :     if (opts.enabled || opts.retaindeadtuples)
     996          119 :         ApplyLauncherWakeupAtCommit();
     997              : 
     998          190 :     InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
     999              : 
    1000          190 :     return myself;
    1001              : }
    1002              : 
    1003              : static void
    1004           39 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
    1005              :                           List *validate_publications)
    1006              : {
    1007              :     char       *err;
    1008           39 :     List       *pubrels = NIL;
    1009              :     Oid        *pubrel_local_oids;
    1010              :     List       *subrel_states;
    1011           39 :     List       *sub_remove_rels = NIL;
    1012              :     Oid        *subrel_local_oids;
    1013              :     Oid        *subseq_local_oids;
    1014              :     int         subrel_count;
    1015              :     ListCell   *lc;
    1016              :     int         off;
    1017           39 :     int         tbl_count = 0;
    1018           39 :     int         seq_count = 0;
    1019           39 :     Relation    rel = NULL;
    1020              :     typedef struct SubRemoveRels
    1021              :     {
    1022              :         Oid         relid;
    1023              :         char        state;
    1024              :     } SubRemoveRels;
    1025              : 
    1026              :     WalReceiverConn *wrconn;
    1027              :     bool        must_use_password;
    1028              : 
    1029              :     /* Load the library providing us libpq calls. */
    1030           39 :     load_file("libpqwalreceiver", false);
    1031              : 
    1032              :     /* Try to connect to the publisher. */
    1033           39 :     must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1034           39 :     wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
    1035              :                             sub->name, &err);
    1036           38 :     if (!wrconn)
    1037            0 :         ereport(ERROR,
    1038              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1039              :                  errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1040              :                         sub->name, err)));
    1041              : 
    1042           38 :     PG_TRY();
    1043              :     {
    1044           38 :         if (validate_publications)
    1045            9 :             check_publications(wrconn, validate_publications);
    1046              : 
    1047              :         /* Get the relation list from publisher. */
    1048           38 :         pubrels = fetch_relation_list(wrconn, sub->publications);
    1049              : 
    1050              :         /* Get local relation list. */
    1051           38 :         subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
    1052           38 :         subrel_count = list_length(subrel_states);
    1053              : 
    1054              :         /*
    1055              :          * Build qsorted arrays of local table oids and sequence oids for
    1056              :          * faster lookup. This can potentially contain all tables and
    1057              :          * sequences in the database so speed of lookup is important.
    1058              :          *
    1059              :          * We do not yet know the exact count of tables and sequences, so we
    1060              :          * allocate separate arrays for table OIDs and sequence OIDs based on
    1061              :          * the total number of relations (subrel_count).
    1062              :          */
    1063           38 :         subrel_local_oids = palloc(subrel_count * sizeof(Oid));
    1064           38 :         subseq_local_oids = palloc(subrel_count * sizeof(Oid));
    1065          136 :         foreach(lc, subrel_states)
    1066              :         {
    1067           98 :             SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
    1068              : 
    1069           98 :             if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
    1070            9 :                 subseq_local_oids[seq_count++] = relstate->relid;
    1071              :             else
    1072           89 :                 subrel_local_oids[tbl_count++] = relstate->relid;
    1073              :         }
    1074              : 
    1075           38 :         qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
    1076           38 :         check_publications_origin_tables(wrconn, sub->publications, copy_data,
    1077           38 :                                          sub->retaindeadtuples, sub->origin,
    1078              :                                          subrel_local_oids, tbl_count,
    1079              :                                          sub->name);
    1080              : 
    1081           38 :         qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
    1082           38 :         check_publications_origin_sequences(wrconn, sub->publications,
    1083              :                                             copy_data, sub->origin,
    1084              :                                             subseq_local_oids, seq_count,
    1085              :                                             sub->name);
    1086              : 
    1087              :         /*
    1088              :          * Walk over the remote relations and try to match them to locally
    1089              :          * known relations. If the relation is not known locally create a new
    1090              :          * state for it.
    1091              :          *
    1092              :          * Also builds array of local oids of remote relations for the next
    1093              :          * step.
    1094              :          */
    1095           38 :         off = 0;
    1096           38 :         pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
    1097              : 
    1098          183 :         foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
    1099              :         {
    1100          107 :             RangeVar   *rv = pubrelinfo->rv;
    1101              :             Oid         relid;
    1102              :             char        relkind;
    1103              : 
    1104          107 :             relid = RangeVarGetRelid(rv, AccessShareLock, false);
    1105          107 :             relkind = get_rel_relkind(relid);
    1106              : 
    1107              :             /* Check for supported relkind. */
    1108          107 :             CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
    1109          107 :                                      rv->schemaname, rv->relname);
    1110              : 
    1111          107 :             pubrel_local_oids[off++] = relid;
    1112              : 
    1113          107 :             if (!bsearch(&relid, subrel_local_oids,
    1114           39 :                          tbl_count, sizeof(Oid), oid_cmp) &&
    1115           39 :                 !bsearch(&relid, subseq_local_oids,
    1116              :                          seq_count, sizeof(Oid), oid_cmp))
    1117              :             {
    1118           30 :                 AddSubscriptionRelState(sub->oid, relid,
    1119              :                                         copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
    1120              :                                         InvalidXLogRecPtr, true);
    1121           30 :                 ereport(DEBUG1,
    1122              :                         errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
    1123              :                                         relkind == RELKIND_SEQUENCE ? "sequence" : "table",
    1124              :                                         rv->schemaname, rv->relname, sub->name));
    1125              :             }
    1126              :         }
    1127              : 
    1128              :         /*
    1129              :          * Next remove state for tables we should not care about anymore using
    1130              :          * the data we collected above
    1131              :          */
    1132           38 :         qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
    1133              : 
    1134          127 :         for (off = 0; off < tbl_count; off++)
    1135              :         {
    1136           89 :             Oid         relid = subrel_local_oids[off];
    1137              : 
    1138           89 :             if (!bsearch(&relid, pubrel_local_oids,
    1139           89 :                          list_length(pubrels), sizeof(Oid), oid_cmp))
    1140              :             {
    1141              :                 char        state;
    1142              :                 XLogRecPtr  statelsn;
    1143           21 :                 SubRemoveRels *remove_rel = palloc_object(SubRemoveRels);
    1144              : 
    1145              :                 /*
    1146              :                  * Lock pg_subscription_rel with AccessExclusiveLock to
    1147              :                  * prevent any race conditions with the apply worker
    1148              :                  * re-launching workers at the same time this code is trying
    1149              :                  * to remove those tables.
    1150              :                  *
    1151              :                  * Even if new worker for this particular rel is restarted it
    1152              :                  * won't be able to make any progress as we hold exclusive
    1153              :                  * lock on pg_subscription_rel till the transaction end. It
    1154              :                  * will simply exit as there is no corresponding rel entry.
    1155              :                  *
    1156              :                  * This locking also ensures that the state of rels won't
    1157              :                  * change till we are done with this refresh operation.
    1158              :                  */
    1159           21 :                 if (!rel)
    1160            9 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1161              : 
    1162              :                 /* Last known rel state. */
    1163           21 :                 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
    1164              : 
    1165           21 :                 RemoveSubscriptionRel(sub->oid, relid);
    1166              : 
    1167           21 :                 remove_rel->relid = relid;
    1168           21 :                 remove_rel->state = state;
    1169              : 
    1170           21 :                 sub_remove_rels = lappend(sub_remove_rels, remove_rel);
    1171              : 
    1172           21 :                 logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
    1173              : 
    1174              :                 /*
    1175              :                  * For READY state, we would have already dropped the
    1176              :                  * tablesync origin.
    1177              :                  */
    1178           21 :                 if (state != SUBREL_STATE_READY)
    1179              :                 {
    1180              :                     char        originname[NAMEDATALEN];
    1181              : 
    1182              :                     /*
    1183              :                      * Drop the tablesync's origin tracking if exists.
    1184              :                      *
    1185              :                      * It is possible that the origin is not yet created for
    1186              :                      * tablesync worker, this can happen for the states before
    1187              :                      * SUBREL_STATE_DATASYNC. The tablesync worker or apply
    1188              :                      * worker can also concurrently try to drop the origin and
    1189              :                      * by this time the origin might be already removed. For
    1190              :                      * these reasons, passing missing_ok = true.
    1191              :                      */
    1192            0 :                     ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
    1193              :                                                        sizeof(originname));
    1194            0 :                     replorigin_drop_by_name(originname, true, false);
    1195              :                 }
    1196              : 
    1197           21 :                 ereport(DEBUG1,
    1198              :                         (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
    1199              :                                          get_namespace_name(get_rel_namespace(relid)),
    1200              :                                          get_rel_name(relid),
    1201              :                                          sub->name)));
    1202              :             }
    1203              :         }
    1204              : 
    1205              :         /*
    1206              :          * Drop the tablesync slots associated with removed tables. This has
    1207              :          * to be at the end because otherwise if there is an error while doing
    1208              :          * the database operations we won't be able to rollback dropped slots.
    1209              :          */
    1210           97 :         foreach_ptr(SubRemoveRels, sub_remove_rel, sub_remove_rels)
    1211              :         {
    1212           21 :             if (sub_remove_rel->state != SUBREL_STATE_READY &&
    1213            0 :                 sub_remove_rel->state != SUBREL_STATE_SYNCDONE)
    1214              :             {
    1215            0 :                 char        syncslotname[NAMEDATALEN] = {0};
    1216              : 
    1217              :                 /*
    1218              :                  * For READY/SYNCDONE states we know the tablesync slot has
    1219              :                  * already been dropped by the tablesync worker.
    1220              :                  *
    1221              :                  * For other states, there is no certainty, maybe the slot
    1222              :                  * does not exist yet. Also, if we fail after removing some of
    1223              :                  * the slots, next time, it will again try to drop already
    1224              :                  * dropped slots and fail. For these reasons, we allow
    1225              :                  * missing_ok = true for the drop.
    1226              :                  */
    1227            0 :                 ReplicationSlotNameForTablesync(sub->oid, sub_remove_rel->relid,
    1228              :                                                 syncslotname, sizeof(syncslotname));
    1229            0 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    1230              :             }
    1231              :         }
    1232              : 
    1233              :         /*
    1234              :          * Next remove state for sequences we should not care about anymore
    1235              :          * using the data we collected above
    1236              :          */
    1237           47 :         for (off = 0; off < seq_count; off++)
    1238              :         {
    1239            9 :             Oid         relid = subseq_local_oids[off];
    1240              : 
    1241            9 :             if (!bsearch(&relid, pubrel_local_oids,
    1242            9 :                          list_length(pubrels), sizeof(Oid), oid_cmp))
    1243              :             {
    1244              :                 /*
    1245              :                  * This locking ensures that the state of rels won't change
    1246              :                  * till we are done with this refresh operation.
    1247              :                  */
    1248            0 :                 if (!rel)
    1249            0 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1250              : 
    1251            0 :                 RemoveSubscriptionRel(sub->oid, relid);
    1252              : 
    1253            0 :                 ereport(DEBUG1,
    1254              :                         errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
    1255              :                                         get_namespace_name(get_rel_namespace(relid)),
    1256              :                                         get_rel_name(relid),
    1257              :                                         sub->name));
    1258              :             }
    1259              :         }
    1260              :     }
    1261            0 :     PG_FINALLY();
    1262              :     {
    1263           38 :         walrcv_disconnect(wrconn);
    1264              :     }
    1265           38 :     PG_END_TRY();
    1266              : 
    1267           38 :     if (rel)
    1268            9 :         table_close(rel, NoLock);
    1269           38 : }
    1270              : 
    1271              : /*
    1272              :  * Marks all sequences with INIT state.
    1273              :  */
    1274              : static void
    1275            2 : AlterSubscription_refresh_seq(Subscription *sub)
    1276              : {
    1277            2 :     char       *err = NULL;
    1278              :     WalReceiverConn *wrconn;
    1279              :     bool        must_use_password;
    1280              : 
    1281              :     /* Load the library providing us libpq calls. */
    1282            2 :     load_file("libpqwalreceiver", false);
    1283              : 
    1284              :     /* Try to connect to the publisher. */
    1285            2 :     must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1286            2 :     wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
    1287              :                             sub->name, &err);
    1288            2 :     if (!wrconn)
    1289            0 :         ereport(ERROR,
    1290              :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1291              :                 errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1292              :                        sub->name, err));
    1293              : 
    1294            2 :     PG_TRY();
    1295              :     {
    1296              :         List       *subrel_states;
    1297              : 
    1298            2 :         check_publications_origin_sequences(wrconn, sub->publications, true,
    1299              :                                             sub->origin, NULL, 0, sub->name);
    1300              : 
    1301              :         /* Get local sequence list. */
    1302            2 :         subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
    1303           12 :         foreach_ptr(SubscriptionRelState, subrel, subrel_states)
    1304              :         {
    1305            8 :             Oid         relid = subrel->relid;
    1306              : 
    1307            8 :             UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
    1308              :                                        InvalidXLogRecPtr, false);
    1309            8 :             ereport(DEBUG1,
    1310              :                     errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
    1311              :                                     get_namespace_name(get_rel_namespace(relid)),
    1312              :                                     get_rel_name(relid),
    1313              :                                     sub->name));
    1314              :         }
    1315              :     }
    1316            0 :     PG_FINALLY();
    1317              :     {
    1318            2 :         walrcv_disconnect(wrconn);
    1319              :     }
    1320            2 :     PG_END_TRY();
    1321            2 : }
    1322              : 
    1323              : /*
    1324              :  * Common checks for altering failover, two_phase, and retain_dead_tuples
    1325              :  * options.
    1326              :  */
    1327              : static void
    1328           14 : CheckAlterSubOption(Subscription *sub, const char *option,
    1329              :                     bool slot_needs_update, bool isTopLevel)
    1330              : {
    1331              :     Assert(strcmp(option, "failover") == 0 ||
    1332              :            strcmp(option, "two_phase") == 0 ||
    1333              :            strcmp(option, "retain_dead_tuples") == 0);
    1334              : 
    1335              :     /*
    1336              :      * Altering the retain_dead_tuples option does not update the slot on the
    1337              :      * publisher.
    1338              :      */
    1339              :     Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
    1340              : 
    1341              :     /*
    1342              :      * Do not allow changing the option if the subscription is enabled. This
    1343              :      * is because both failover and two_phase options of the slot on the
    1344              :      * publisher cannot be modified if the slot is currently acquired by the
    1345              :      * existing walsender.
    1346              :      *
    1347              :      * Note that two_phase is enabled (aka changed from 'false' to 'true') on
    1348              :      * the publisher by the existing walsender, so we could have allowed that
    1349              :      * even when the subscription is enabled. But we kept this restriction for
    1350              :      * the sake of consistency and simplicity.
    1351              :      *
    1352              :      * Additionally, do not allow changing the retain_dead_tuples option when
    1353              :      * the subscription is enabled to prevent race conditions arising from the
    1354              :      * new option value being acknowledged asynchronously by the launcher and
    1355              :      * apply workers.
    1356              :      *
    1357              :      * Without the restriction, a race condition may arise when a user
    1358              :      * disables and immediately re-enables the retain_dead_tuples option. In
    1359              :      * this case, the launcher might drop the slot upon noticing the disabled
    1360              :      * action, while the apply worker may keep maintaining
    1361              :      * oldest_nonremovable_xid without noticing the option change. During this
    1362              :      * period, a transaction ID wraparound could falsely make this ID appear
    1363              :      * as if it originates from the future w.r.t the transaction ID stored in
    1364              :      * the slot maintained by launcher.
    1365              :      *
    1366              :      * Similarly, if the user enables retain_dead_tuples concurrently with the
    1367              :      * launcher starting the worker, the apply worker may start calculating
    1368              :      * oldest_nonremovable_xid before the launcher notices the enable action.
    1369              :      * Consequently, the launcher may update slot.xmin to a newer value than
    1370              :      * that maintained by the worker. In subsequent cycles, upon integrating
    1371              :      * the worker's oldest_nonremovable_xid, the launcher might detect a
    1372              :      * retreat in the calculated xmin, necessitating additional handling.
    1373              :      *
    1374              :      * XXX To address the above race conditions, we can define
    1375              :      * oldest_nonremovable_xid as FullTransactionId and adds the check to
    1376              :      * disallow retreating the conflict slot's xmin. For now, we kept the
    1377              :      * implementation simple by disallowing change to the retain_dead_tuples,
    1378              :      * but in the future we can change this after some more analysis.
    1379              :      *
    1380              :      * Note that we could restrict only the enabling of retain_dead_tuples to
    1381              :      * avoid the race conditions described above, but we maintain the
    1382              :      * restriction for both enable and disable operations for the sake of
    1383              :      * consistency.
    1384              :      */
    1385           14 :     if (sub->enabled)
    1386            2 :         ereport(ERROR,
    1387              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1388              :                  errmsg("cannot set option \"%s\" for enabled subscription",
    1389              :                         option)));
    1390              : 
    1391           12 :     if (slot_needs_update)
    1392              :     {
    1393              :         StringInfoData cmd;
    1394              : 
    1395              :         /*
    1396              :          * A valid slot must be associated with the subscription for us to
    1397              :          * modify any of the slot's properties.
    1398              :          */
    1399            9 :         if (!sub->slotname)
    1400            0 :             ereport(ERROR,
    1401              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1402              :                      errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
    1403              :                             option)));
    1404              : 
    1405              :         /* The changed option of the slot can't be rolled back. */
    1406            9 :         initStringInfo(&cmd);
    1407            9 :         appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
    1408              : 
    1409            9 :         PreventInTransactionBlock(isTopLevel, cmd.data);
    1410            5 :         pfree(cmd.data);
    1411              :     }
    1412            8 : }
    1413              : 
    1414              : /*
    1415              :  * Alter the existing subscription.
    1416              :  */
    1417              : ObjectAddress
    1418          341 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
    1419              :                   bool isTopLevel)
    1420              : {
    1421              :     Relation    rel;
    1422              :     ObjectAddress myself;
    1423              :     bool        nulls[Natts_pg_subscription];
    1424              :     bool        replaces[Natts_pg_subscription];
    1425              :     Datum       values[Natts_pg_subscription];
    1426              :     HeapTuple   tup;
    1427              :     Oid         subid;
    1428          341 :     bool        update_tuple = false;
    1429          341 :     bool        update_failover = false;
    1430          341 :     bool        update_two_phase = false;
    1431          341 :     bool        check_pub_rdt = false;
    1432              :     bool        retain_dead_tuples;
    1433              :     int         max_retention;
    1434              :     bool        retention_active;
    1435          341 :     char       *new_conninfo = NULL;
    1436              :     char       *origin;
    1437              :     Subscription *sub;
    1438              :     Form_pg_subscription form;
    1439              :     uint32      supported_opts;
    1440          341 :     SubOpts     opts = {0};
    1441              : 
    1442          341 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1443              : 
    1444              :     /* Fetch the existing tuple. */
    1445          341 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    1446              :                               CStringGetDatum(stmt->subname));
    1447              : 
    1448          341 :     if (!HeapTupleIsValid(tup))
    1449            4 :         ereport(ERROR,
    1450              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1451              :                  errmsg("subscription \"%s\" does not exist",
    1452              :                         stmt->subname)));
    1453              : 
    1454          337 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1455          337 :     subid = form->oid;
    1456              : 
    1457              :     /* must be owner */
    1458          337 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    1459            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    1460            0 :                        stmt->subname);
    1461              : 
    1462              :     /*
    1463              :      * Skip ACL checks on the subscription's foreign server, if any. If
    1464              :      * changing the server (or replacing it with a raw connection), then the
    1465              :      * old one will be removed anyway. If changing something unrelated,
    1466              :      * there's no need to do an additional ACL check here; that will be done
    1467              :      * by the subscription worker anyway.
    1468              :      */
    1469          337 :     sub = GetSubscription(subid, false, false);
    1470              : 
    1471          337 :     retain_dead_tuples = sub->retaindeadtuples;
    1472          337 :     origin = sub->origin;
    1473          337 :     max_retention = sub->maxretention;
    1474          337 :     retention_active = sub->retentionactive;
    1475              : 
    1476              :     /*
    1477              :      * Don't allow non-superuser modification of a subscription with
    1478              :      * password_required=false.
    1479              :      */
    1480          337 :     if (!sub->passwordrequired && !superuser())
    1481            0 :         ereport(ERROR,
    1482              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1483              :                  errmsg("password_required=false is superuser-only"),
    1484              :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1485              : 
    1486              :     /* Lock the subscription so nobody else can do anything with it. */
    1487          337 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    1488              : 
    1489              :     /* Form a new tuple. */
    1490          337 :     memset(values, 0, sizeof(values));
    1491          337 :     memset(nulls, false, sizeof(nulls));
    1492          337 :     memset(replaces, false, sizeof(replaces));
    1493              : 
    1494          337 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    1495              : 
    1496          337 :     switch (stmt->kind)
    1497              :     {
    1498          156 :         case ALTER_SUBSCRIPTION_OPTIONS:
    1499              :             {
    1500          156 :                 supported_opts = (SUBOPT_SLOT_NAME |
    1501              :                                   SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
    1502              :                                   SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
    1503              :                                   SUBOPT_DISABLE_ON_ERR |
    1504              :                                   SUBOPT_PASSWORD_REQUIRED |
    1505              :                                   SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
    1506              :                                   SUBOPT_RETAIN_DEAD_TUPLES |
    1507              :                                   SUBOPT_MAX_RETENTION_DURATION |
    1508              :                                   SUBOPT_WAL_RECEIVER_TIMEOUT |
    1509              :                                   SUBOPT_ORIGIN);
    1510              : 
    1511          156 :                 parse_subscription_options(pstate, stmt->options,
    1512              :                                            supported_opts, &opts);
    1513              : 
    1514          136 :                 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1515              :                 {
    1516              :                     /*
    1517              :                      * The subscription must be disabled to allow slot_name as
    1518              :                      * 'none', otherwise, the apply worker will repeatedly try
    1519              :                      * to stream the data using that slot_name which neither
    1520              :                      * exists on the publisher nor the user will be allowed to
    1521              :                      * create it.
    1522              :                      */
    1523           51 :                     if (sub->enabled && !opts.slot_name)
    1524            0 :                         ereport(ERROR,
    1525              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1526              :                                  errmsg("cannot set %s for enabled subscription",
    1527              :                                         "slot_name = NONE")));
    1528              : 
    1529           51 :                     if (opts.slot_name)
    1530            4 :                         values[Anum_pg_subscription_subslotname - 1] =
    1531            4 :                             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
    1532              :                     else
    1533           47 :                         nulls[Anum_pg_subscription_subslotname - 1] = true;
    1534           51 :                     replaces[Anum_pg_subscription_subslotname - 1] = true;
    1535              :                 }
    1536              : 
    1537          136 :                 if (opts.synchronous_commit)
    1538              :                 {
    1539            4 :                     values[Anum_pg_subscription_subsynccommit - 1] =
    1540            4 :                         CStringGetTextDatum(opts.synchronous_commit);
    1541            4 :                     replaces[Anum_pg_subscription_subsynccommit - 1] = true;
    1542              :                 }
    1543              : 
    1544          136 :                 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
    1545              :                 {
    1546           10 :                     values[Anum_pg_subscription_subbinary - 1] =
    1547           10 :                         BoolGetDatum(opts.binary);
    1548           10 :                     replaces[Anum_pg_subscription_subbinary - 1] = true;
    1549              :                 }
    1550              : 
    1551          136 :                 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
    1552              :                 {
    1553           18 :                     values[Anum_pg_subscription_substream - 1] =
    1554           18 :                         CharGetDatum(opts.streaming);
    1555           18 :                     replaces[Anum_pg_subscription_substream - 1] = true;
    1556              :                 }
    1557              : 
    1558          136 :                 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
    1559              :                 {
    1560              :                     values[Anum_pg_subscription_subdisableonerr - 1]
    1561            4 :                         = BoolGetDatum(opts.disableonerr);
    1562              :                     replaces[Anum_pg_subscription_subdisableonerr - 1]
    1563            4 :                         = true;
    1564              :                 }
    1565              : 
    1566          136 :                 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
    1567              :                 {
    1568              :                     /* Non-superuser may not disable password_required. */
    1569            8 :                     if (!opts.passwordrequired && !superuser())
    1570            0 :                         ereport(ERROR,
    1571              :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1572              :                                  errmsg("password_required=false is superuser-only"),
    1573              :                                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1574              : 
    1575              :                     values[Anum_pg_subscription_subpasswordrequired - 1]
    1576            8 :                         = BoolGetDatum(opts.passwordrequired);
    1577              :                     replaces[Anum_pg_subscription_subpasswordrequired - 1]
    1578            8 :                         = true;
    1579              :                 }
    1580              : 
    1581          136 :                 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
    1582              :                 {
    1583            9 :                     values[Anum_pg_subscription_subrunasowner - 1] =
    1584            9 :                         BoolGetDatum(opts.runasowner);
    1585            9 :                     replaces[Anum_pg_subscription_subrunasowner - 1] = true;
    1586              :                 }
    1587              : 
    1588          136 :                 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
    1589              :                 {
    1590              :                     /*
    1591              :                      * We need to update both the slot and the subscription
    1592              :                      * for the two_phase option. We can enable the two_phase
    1593              :                      * option for a slot only once the initial data
    1594              :                      * synchronization is done. This is to avoid missing some
    1595              :                      * data as explained in comments atop worker.c.
    1596              :                      */
    1597            3 :                     update_two_phase = !opts.twophase;
    1598              : 
    1599            3 :                     CheckAlterSubOption(sub, "two_phase", update_two_phase,
    1600              :                                         isTopLevel);
    1601              : 
    1602              :                     /*
    1603              :                      * Modifying the two_phase slot option requires a slot
    1604              :                      * lookup by slot name, so changing the slot name at the
    1605              :                      * same time is not allowed.
    1606              :                      */
    1607            3 :                     if (update_two_phase &&
    1608            1 :                         IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1609            0 :                         ereport(ERROR,
    1610              :                                 (errcode(ERRCODE_SYNTAX_ERROR),
    1611              :                                  errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
    1612              : 
    1613              :                     /*
    1614              :                      * Note that workers may still survive even if the
    1615              :                      * subscription has been disabled.
    1616              :                      *
    1617              :                      * Ensure workers have already been exited to avoid
    1618              :                      * getting prepared transactions while we are disabling
    1619              :                      * the two_phase option. Otherwise, the changes of an
    1620              :                      * already prepared transaction can be replicated again
    1621              :                      * along with its corresponding commit, leading to
    1622              :                      * duplicate data or errors.
    1623              :                      */
    1624            3 :                     if (logicalrep_workers_find(subid, true, true))
    1625            0 :                         ereport(ERROR,
    1626              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1627              :                                  errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
    1628              :                                  errhint("Try again after some time.")));
    1629              : 
    1630              :                     /*
    1631              :                      * two_phase cannot be disabled if there are any
    1632              :                      * uncommitted prepared transactions present otherwise it
    1633              :                      * can lead to duplicate data or errors as explained in
    1634              :                      * the comment above.
    1635              :                      */
    1636            3 :                     if (update_two_phase &&
    1637            1 :                         sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
    1638            1 :                         LookupGXactBySubid(subid))
    1639            0 :                         ereport(ERROR,
    1640              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1641              :                                  errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
    1642              :                                  errhint("Resolve these transactions and try again.")));
    1643              : 
    1644              :                     /* Change system catalog accordingly */
    1645            3 :                     values[Anum_pg_subscription_subtwophasestate - 1] =
    1646            3 :                         CharGetDatum(opts.twophase ?
    1647              :                                      LOGICALREP_TWOPHASE_STATE_PENDING :
    1648              :                                      LOGICALREP_TWOPHASE_STATE_DISABLED);
    1649            3 :                     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1650              :                 }
    1651              : 
    1652          136 :                 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
    1653              :                 {
    1654              :                     /*
    1655              :                      * Similar to the two_phase case above, we need to update
    1656              :                      * the failover option for both the slot and the
    1657              :                      * subscription.
    1658              :                      */
    1659            9 :                     update_failover = true;
    1660              : 
    1661            9 :                     CheckAlterSubOption(sub, "failover", update_failover,
    1662              :                                         isTopLevel);
    1663              : 
    1664            4 :                     values[Anum_pg_subscription_subfailover - 1] =
    1665            4 :                         BoolGetDatum(opts.failover);
    1666            4 :                     replaces[Anum_pg_subscription_subfailover - 1] = true;
    1667              :                 }
    1668              : 
    1669          131 :                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
    1670              :                 {
    1671            2 :                     values[Anum_pg_subscription_subretaindeadtuples - 1] =
    1672            2 :                         BoolGetDatum(opts.retaindeadtuples);
    1673            2 :                     replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
    1674              : 
    1675              :                     /*
    1676              :                      * Update the retention status only if there's a change in
    1677              :                      * the retain_dead_tuples option value.
    1678              :                      *
    1679              :                      * Automatically marking retention as active when
    1680              :                      * retain_dead_tuples is enabled may not always be ideal,
    1681              :                      * especially if retention was previously stopped and the
    1682              :                      * user toggles retain_dead_tuples without adjusting the
    1683              :                      * publisher workload. However, this behavior provides a
    1684              :                      * convenient way for users to manually refresh the
    1685              :                      * retention status. Since retention will be stopped again
    1686              :                      * unless the publisher workload is reduced, this approach
    1687              :                      * is acceptable for now.
    1688              :                      */
    1689            2 :                     if (opts.retaindeadtuples != sub->retaindeadtuples)
    1690              :                     {
    1691            2 :                         values[Anum_pg_subscription_subretentionactive - 1] =
    1692            2 :                             BoolGetDatum(opts.retaindeadtuples);
    1693            2 :                         replaces[Anum_pg_subscription_subretentionactive - 1] = true;
    1694              : 
    1695            2 :                         retention_active = opts.retaindeadtuples;
    1696              :                     }
    1697              : 
    1698            2 :                     CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
    1699              : 
    1700              :                     /*
    1701              :                      * Workers may continue running even after the
    1702              :                      * subscription has been disabled.
    1703              :                      *
    1704              :                      * To prevent race conditions (as described in
    1705              :                      * CheckAlterSubOption()), ensure that all worker
    1706              :                      * processes have already exited before proceeding.
    1707              :                      */
    1708            1 :                     if (logicalrep_workers_find(subid, true, true))
    1709            0 :                         ereport(ERROR,
    1710              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1711              :                                  errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
    1712              :                                  errhint("Try again after some time.")));
    1713              : 
    1714              :                     /*
    1715              :                      * Notify the launcher to manage the replication slot for
    1716              :                      * conflict detection. This ensures that replication slot
    1717              :                      * is efficiently handled (created, updated, or dropped)
    1718              :                      * in response to any configuration changes.
    1719              :                      */
    1720            1 :                     ApplyLauncherWakeupAtCommit();
    1721              : 
    1722            1 :                     check_pub_rdt = opts.retaindeadtuples;
    1723            1 :                     retain_dead_tuples = opts.retaindeadtuples;
    1724              :                 }
    1725              : 
    1726          130 :                 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1727              :                 {
    1728            6 :                     values[Anum_pg_subscription_submaxretention - 1] =
    1729            6 :                         Int32GetDatum(opts.maxretention);
    1730            6 :                     replaces[Anum_pg_subscription_submaxretention - 1] = true;
    1731              : 
    1732            6 :                     max_retention = opts.maxretention;
    1733              :                 }
    1734              : 
    1735              :                 /*
    1736              :                  * Ensure that system configuration parameters are set
    1737              :                  * appropriately to support retain_dead_tuples and
    1738              :                  * max_retention_duration.
    1739              :                  */
    1740          130 :                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
    1741          129 :                     IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1742            7 :                     CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
    1743              :                                                retain_dead_tuples,
    1744              :                                                retention_active,
    1745            7 :                                                (max_retention > 0));
    1746              : 
    1747          130 :                 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
    1748              :                 {
    1749            6 :                     values[Anum_pg_subscription_suborigin - 1] =
    1750            6 :                         CStringGetTextDatum(opts.origin);
    1751            6 :                     replaces[Anum_pg_subscription_suborigin - 1] = true;
    1752              : 
    1753              :                     /*
    1754              :                      * Check if changes from different origins may be received
    1755              :                      * from the publisher when the origin is changed to ANY
    1756              :                      * and retain_dead_tuples is enabled. Use |= so that we
    1757              :                      * don't clear the flag already set when
    1758              :                      * retain_dead_tuples was changed in the same command.
    1759              :                      */
    1760            8 :                     check_pub_rdt |= retain_dead_tuples &&
    1761            2 :                         pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
    1762              : 
    1763            6 :                     origin = opts.origin;
    1764              :                 }
    1765              : 
    1766          130 :                 if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
    1767              :                 {
    1768            8 :                     values[Anum_pg_subscription_subwalrcvtimeout - 1] =
    1769            8 :                         CStringGetTextDatum(opts.wal_receiver_timeout);
    1770            8 :                     replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true;
    1771              :                 }
    1772              : 
    1773          130 :                 update_tuple = true;
    1774          130 :                 break;
    1775              :             }
    1776              : 
    1777           58 :         case ALTER_SUBSCRIPTION_ENABLED:
    1778              :             {
    1779           58 :                 parse_subscription_options(pstate, stmt->options,
    1780              :                                            SUBOPT_ENABLED, &opts);
    1781              :                 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
    1782              : 
    1783           58 :                 if (!sub->slotname && opts.enabled)
    1784            4 :                     ereport(ERROR,
    1785              :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1786              :                              errmsg("cannot enable subscription that does not have a slot name")));
    1787              : 
    1788              :                 /*
    1789              :                  * Check track_commit_timestamp only when enabling the
    1790              :                  * subscription in case it was disabled after creation. See
    1791              :                  * comments atop CheckSubDeadTupleRetention() for details.
    1792              :                  */
    1793           54 :                 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
    1794           54 :                                            WARNING, sub->retaindeadtuples,
    1795           54 :                                            sub->retentionactive, false);
    1796              : 
    1797           54 :                 values[Anum_pg_subscription_subenabled - 1] =
    1798           54 :                     BoolGetDatum(opts.enabled);
    1799           54 :                 replaces[Anum_pg_subscription_subenabled - 1] = true;
    1800              : 
    1801           54 :                 if (opts.enabled)
    1802           30 :                     ApplyLauncherWakeupAtCommit();
    1803              : 
    1804           54 :                 update_tuple = true;
    1805              : 
    1806              :                 /*
    1807              :                  * The subscription might be initially created with
    1808              :                  * connect=false and retain_dead_tuples=true, meaning the
    1809              :                  * remote server's status may not be checked. Ensure this
    1810              :                  * check is conducted now.
    1811              :                  */
    1812           54 :                 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
    1813           54 :                 break;
    1814              :             }
    1815              : 
    1816            1 :         case ALTER_SUBSCRIPTION_SERVER:
    1817              :             {
    1818              :                 ForeignServer *new_server;
    1819              :                 ObjectAddress referenced;
    1820              :                 AclResult   aclresult;
    1821              : 
    1822              :                 /*
    1823              :                  * Remove what was there before, either another foreign server
    1824              :                  * or a connection string.
    1825              :                  */
    1826            1 :                 if (form->subserver)
    1827              :                 {
    1828            0 :                     deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
    1829              :                                                        DEPENDENCY_NORMAL,
    1830              :                                                        ForeignServerRelationId, form->subserver);
    1831              :                 }
    1832              :                 else
    1833              :                 {
    1834            1 :                     nulls[Anum_pg_subscription_subconninfo - 1] = true;
    1835            1 :                     replaces[Anum_pg_subscription_subconninfo - 1] = true;
    1836              :                 }
    1837              : 
    1838              :                 /*
    1839              :                  * Check that the subscription owner has USAGE privileges on
    1840              :                  * the server.
    1841              :                  */
    1842            1 :                 new_server = GetForeignServerByName(stmt->servername, false);
    1843            1 :                 aclresult = object_aclcheck(ForeignServerRelationId,
    1844              :                                             new_server->serverid,
    1845              :                                             form->subowner, ACL_USAGE);
    1846            1 :                 if (aclresult != ACLCHECK_OK)
    1847            0 :                     ereport(ERROR,
    1848              :                             errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1849              :                             errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
    1850              :                                    GetUserNameFromId(form->subowner, false),
    1851              :                                    new_server->servername));
    1852              : 
    1853              :                 /* make sure a user mapping exists */
    1854            1 :                 GetUserMapping(form->subowner, new_server->serverid);
    1855              : 
    1856            1 :                 new_conninfo = ForeignServerConnectionString(form->subowner,
    1857              :                                                              new_server);
    1858              : 
    1859              :                 /* Load the library providing us libpq calls. */
    1860            1 :                 load_file("libpqwalreceiver", false);
    1861              :                 /* Check the connection info string. */
    1862            1 :                 walrcv_check_conninfo(new_conninfo,
    1863              :                                       sub->passwordrequired && !sub->ownersuperuser);
    1864              : 
    1865            1 :                 values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(new_server->serverid);
    1866            1 :                 replaces[Anum_pg_subscription_subserver - 1] = true;
    1867              : 
    1868            1 :                 ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
    1869            1 :                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
    1870              : 
    1871            1 :                 update_tuple = true;
    1872              :             }
    1873              : 
    1874              :             /*
    1875              :              * Since the remote server configuration might have changed,
    1876              :              * perform a check to ensure it permits enabling
    1877              :              * retain_dead_tuples.
    1878              :              */
    1879            1 :             check_pub_rdt = sub->retaindeadtuples;
    1880            1 :             break;
    1881              : 
    1882           14 :         case ALTER_SUBSCRIPTION_CONNECTION:
    1883              :             /* remove reference to foreign server and dependencies, if present */
    1884           14 :             if (form->subserver)
    1885              :             {
    1886            1 :                 deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
    1887              :                                                    DEPENDENCY_NORMAL,
    1888              :                                                    ForeignServerRelationId, form->subserver);
    1889              : 
    1890            1 :                 values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(InvalidOid);
    1891            1 :                 replaces[Anum_pg_subscription_subserver - 1] = true;
    1892              :             }
    1893              : 
    1894           14 :             new_conninfo = stmt->conninfo;
    1895              : 
    1896              :             /* Load the library providing us libpq calls. */
    1897           14 :             load_file("libpqwalreceiver", false);
    1898              :             /* Check the connection info string. */
    1899           14 :             walrcv_check_conninfo(new_conninfo,
    1900              :                                   sub->passwordrequired && !sub->ownersuperuser);
    1901              : 
    1902           10 :             values[Anum_pg_subscription_subconninfo - 1] =
    1903           10 :                 CStringGetTextDatum(stmt->conninfo);
    1904           10 :             replaces[Anum_pg_subscription_subconninfo - 1] = true;
    1905           10 :             update_tuple = true;
    1906              : 
    1907              :             /*
    1908              :              * Since the remote server configuration might have changed,
    1909              :              * perform a check to ensure it permits enabling
    1910              :              * retain_dead_tuples.
    1911              :              */
    1912           10 :             check_pub_rdt = sub->retaindeadtuples;
    1913           10 :             break;
    1914              : 
    1915           19 :         case ALTER_SUBSCRIPTION_SET_PUBLICATION:
    1916              :             {
    1917           19 :                 supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
    1918           19 :                 parse_subscription_options(pstate, stmt->options,
    1919              :                                            supported_opts, &opts);
    1920              : 
    1921           19 :                 values[Anum_pg_subscription_subpublications - 1] =
    1922           19 :                     publicationListToArray(stmt->publication);
    1923           19 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1924              : 
    1925           19 :                 update_tuple = true;
    1926              : 
    1927              :                 /* Refresh if user asked us to. */
    1928           19 :                 if (opts.refresh)
    1929              :                 {
    1930           15 :                     if (!sub->enabled)
    1931            0 :                         ereport(ERROR,
    1932              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1933              :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1934              :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
    1935              : 
    1936              :                     /*
    1937              :                      * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    1938              :                      * why this is not allowed.
    1939              :                      */
    1940           15 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1941            0 :                         ereport(ERROR,
    1942              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1943              :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1944              :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    1945              : 
    1946           15 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1947              : 
    1948              :                     /* Make sure refresh sees the new list of publications. */
    1949            7 :                     sub->publications = stmt->publication;
    1950              : 
    1951            7 :                     AlterSubscription_refresh(sub, opts.copy_data,
    1952              :                                               stmt->publication);
    1953              :                 }
    1954              : 
    1955           11 :                 break;
    1956              :             }
    1957              : 
    1958           35 :         case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
    1959              :         case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
    1960              :             {
    1961              :                 List       *publist;
    1962           35 :                 bool        isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
    1963              : 
    1964           35 :                 supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
    1965           35 :                 parse_subscription_options(pstate, stmt->options,
    1966              :                                            supported_opts, &opts);
    1967              : 
    1968           35 :                 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
    1969           11 :                 values[Anum_pg_subscription_subpublications - 1] =
    1970           11 :                     publicationListToArray(publist);
    1971           11 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1972              : 
    1973           11 :                 update_tuple = true;
    1974              : 
    1975              :                 /* Refresh if user asked us to. */
    1976           11 :                 if (opts.refresh)
    1977              :                 {
    1978              :                     /* We only need to validate user specified publications. */
    1979            3 :                     List       *validate_publications = (isadd) ? stmt->publication : NULL;
    1980              : 
    1981            3 :                     if (!sub->enabled)
    1982            0 :                         ereport(ERROR,
    1983              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1984              :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1985              :                         /* translator: %s is an SQL ALTER command */
    1986              :                                  errhint("Use %s instead.",
    1987              :                                          isadd ?
    1988              :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
    1989              :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
    1990              : 
    1991              :                     /*
    1992              :                      * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    1993              :                      * why this is not allowed.
    1994              :                      */
    1995            3 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1996            0 :                         ereport(ERROR,
    1997              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1998              :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1999              :                         /* translator: %s is an SQL ALTER command */
    2000              :                                  errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
    2001              :                                          isadd ?
    2002              :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
    2003              :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
    2004              : 
    2005            3 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    2006              : 
    2007              :                     /* Refresh the new list of publications. */
    2008            3 :                     sub->publications = publist;
    2009              : 
    2010            3 :                     AlterSubscription_refresh(sub, opts.copy_data,
    2011              :                                               validate_publications);
    2012              :                 }
    2013              : 
    2014           11 :                 break;
    2015              :             }
    2016              : 
    2017           37 :         case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
    2018              :             {
    2019           37 :                 if (!sub->enabled)
    2020            4 :                     ereport(ERROR,
    2021              :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2022              :                              errmsg("%s is not allowed for disabled subscriptions",
    2023              :                                     "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
    2024              : 
    2025           33 :                 parse_subscription_options(pstate, stmt->options,
    2026              :                                            SUBOPT_COPY_DATA, &opts);
    2027              : 
    2028              :                 /*
    2029              :                  * The subscription option "two_phase" requires that
    2030              :                  * replication has passed the initial table synchronization
    2031              :                  * phase before the two_phase becomes properly enabled.
    2032              :                  *
    2033              :                  * But, having reached this two-phase commit "enabled" state
    2034              :                  * we must not allow any subsequent table initialization to
    2035              :                  * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
    2036              :                  * disallowed when the user had requested two_phase = on mode.
    2037              :                  *
    2038              :                  * The exception to this restriction is when copy_data =
    2039              :                  * false, because when copy_data is false the tablesync will
    2040              :                  * start already in READY state and will exit directly without
    2041              :                  * doing anything.
    2042              :                  *
    2043              :                  * For more details see comments atop worker.c.
    2044              :                  */
    2045           33 :                 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    2046            0 :                     ereport(ERROR,
    2047              :                             (errcode(ERRCODE_SYNTAX_ERROR),
    2048              :                              errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
    2049              :                              errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    2050              : 
    2051           33 :                 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
    2052              : 
    2053           29 :                 AlterSubscription_refresh(sub, opts.copy_data, NULL);
    2054              : 
    2055           28 :                 break;
    2056              :             }
    2057              : 
    2058            2 :         case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
    2059              :             {
    2060            2 :                 if (!sub->enabled)
    2061            0 :                     ereport(ERROR,
    2062              :                             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2063              :                             errmsg("%s is not allowed for disabled subscriptions",
    2064              :                                    "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
    2065              : 
    2066            2 :                 AlterSubscription_refresh_seq(sub);
    2067              : 
    2068            2 :                 break;
    2069              :             }
    2070              : 
    2071           15 :         case ALTER_SUBSCRIPTION_SKIP:
    2072              :             {
    2073           15 :                 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
    2074              : 
    2075              :                 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
    2076              :                 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
    2077              : 
    2078              :                 /*
    2079              :                  * If the user sets subskiplsn, we do a sanity check to make
    2080              :                  * sure that the specified LSN is a probable value.
    2081              :                  */
    2082           11 :                 if (XLogRecPtrIsValid(opts.lsn))
    2083              :                 {
    2084              :                     ReplOriginId originid;
    2085              :                     char        originname[NAMEDATALEN];
    2086              :                     XLogRecPtr  remote_lsn;
    2087              : 
    2088            7 :                     ReplicationOriginNameForLogicalRep(subid, InvalidOid,
    2089              :                                                        originname, sizeof(originname));
    2090            7 :                     originid = replorigin_by_name(originname, false);
    2091            7 :                     remote_lsn = replorigin_get_progress(originid, false);
    2092              : 
    2093              :                     /* Check the given LSN is at least a future LSN */
    2094            7 :                     if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
    2095            0 :                         ereport(ERROR,
    2096              :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2097              :                                  errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
    2098              :                                         LSN_FORMAT_ARGS(opts.lsn),
    2099              :                                         LSN_FORMAT_ARGS(remote_lsn))));
    2100              :                 }
    2101              : 
    2102           11 :                 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
    2103           11 :                 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
    2104              : 
    2105           11 :                 update_tuple = true;
    2106           11 :                 break;
    2107              :             }
    2108              : 
    2109            0 :         default:
    2110            0 :             elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
    2111              :                  stmt->kind);
    2112              :     }
    2113              : 
    2114              :     /* Update the catalog if needed. */
    2115          258 :     if (update_tuple)
    2116              :     {
    2117          228 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    2118              :                                 replaces);
    2119              : 
    2120          228 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    2121              : 
    2122          228 :         heap_freetuple(tup);
    2123              :     }
    2124              : 
    2125              :     /*
    2126              :      * Try to acquire the connection necessary either for modifying the slot
    2127              :      * or for checking if the remote server permits enabling
    2128              :      * retain_dead_tuples.
    2129              :      *
    2130              :      * This has to be at the end because otherwise if there is an error while
    2131              :      * doing the database operations we won't be able to rollback altered
    2132              :      * slot.
    2133              :      */
    2134          258 :     if (update_failover || update_two_phase || check_pub_rdt)
    2135              :     {
    2136              :         bool        must_use_password;
    2137              :         char       *err;
    2138              :         WalReceiverConn *wrconn;
    2139              : 
    2140              :         /* Load the library providing us libpq calls. */
    2141           13 :         load_file("libpqwalreceiver", false);
    2142              : 
    2143              :         /*
    2144              :          * Try to connect to the publisher, using the new connection string if
    2145              :          * available.
    2146              :          */
    2147           13 :         must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    2148           13 :         wrconn = walrcv_connect(new_conninfo ? new_conninfo : sub->conninfo,
    2149              :                                 true, true, must_use_password, sub->name,
    2150              :                                 &err);
    2151           13 :         if (!wrconn)
    2152            0 :             ereport(ERROR,
    2153              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    2154              :                      errmsg("subscription \"%s\" could not connect to the publisher: %s",
    2155              :                             sub->name, err)));
    2156              : 
    2157           13 :         PG_TRY();
    2158              :         {
    2159           13 :             if (retain_dead_tuples)
    2160            9 :                 check_pub_dead_tuple_retention(wrconn);
    2161              : 
    2162           13 :             check_publications_origin_tables(wrconn, sub->publications, false,
    2163              :                                              retain_dead_tuples, origin, NULL, 0,
    2164              :                                              sub->name);
    2165              : 
    2166           13 :             if (update_failover || update_two_phase)
    2167            5 :                 walrcv_alter_slot(wrconn, sub->slotname,
    2168              :                                   update_failover ? &opts.failover : NULL,
    2169              :                                   update_two_phase ? &opts.twophase : NULL);
    2170              :         }
    2171            0 :         PG_FINALLY();
    2172              :         {
    2173           13 :             walrcv_disconnect(wrconn);
    2174              :         }
    2175           13 :         PG_END_TRY();
    2176              :     }
    2177              : 
    2178          258 :     table_close(rel, RowExclusiveLock);
    2179              : 
    2180          258 :     InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
    2181              : 
    2182              :     /* Wake up related replication workers to handle this change quickly. */
    2183          258 :     LogicalRepWorkersWakeupAtCommit(subid);
    2184              : 
    2185          258 :     return myself;
    2186              : }
    2187              : 
    2188              : /*
    2189              :  * Drop a subscription
    2190              :  */
    2191              : void
    2192          155 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    2193              : {
    2194              :     Relation    rel;
    2195              :     ObjectAddress myself;
    2196              :     HeapTuple   tup;
    2197              :     Oid         subid;
    2198              :     Oid         subowner;
    2199              :     Datum       datum;
    2200              :     bool        isnull;
    2201              :     char       *subname;
    2202              :     char       *conninfo;
    2203              :     char       *slotname;
    2204              :     List       *subworkers;
    2205              :     ListCell   *lc;
    2206              :     char        originname[NAMEDATALEN];
    2207          155 :     char       *err = NULL;
    2208          155 :     WalReceiverConn *wrconn = NULL;
    2209              :     Form_pg_subscription form;
    2210              :     List       *rstates;
    2211              :     bool        must_use_password;
    2212              : 
    2213              :     /*
    2214              :      * The launcher may concurrently start a new worker for this subscription.
    2215              :      * During initialization, the worker checks for subscription validity and
    2216              :      * exits if the subscription has already been dropped. See
    2217              :      * InitializeLogRepWorker.
    2218              :      */
    2219          155 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2220              : 
    2221          155 :     tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2222          155 :                           CStringGetDatum(stmt->subname));
    2223              : 
    2224          155 :     if (!HeapTupleIsValid(tup))
    2225              :     {
    2226            8 :         table_close(rel, NoLock);
    2227              : 
    2228            8 :         if (!stmt->missing_ok)
    2229            4 :             ereport(ERROR,
    2230              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2231              :                      errmsg("subscription \"%s\" does not exist",
    2232              :                             stmt->subname)));
    2233              :         else
    2234            4 :             ereport(NOTICE,
    2235              :                     (errmsg("subscription \"%s\" does not exist, skipping",
    2236              :                             stmt->subname)));
    2237              : 
    2238           65 :         return;
    2239              :     }
    2240              : 
    2241          147 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2242          147 :     subid = form->oid;
    2243          147 :     subowner = form->subowner;
    2244          147 :     must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
    2245              : 
    2246              :     /* must be owner */
    2247          147 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    2248            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2249            0 :                        stmt->subname);
    2250              : 
    2251              :     /* DROP hook for the subscription being removed */
    2252          147 :     InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
    2253              : 
    2254              :     /*
    2255              :      * Lock the subscription so nobody else can do anything with it (including
    2256              :      * the replication workers).
    2257              :      */
    2258          147 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    2259              : 
    2260              :     /* Get subname */
    2261          147 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2262              :                                    Anum_pg_subscription_subname);
    2263          147 :     subname = pstrdup(NameStr(*DatumGetName(datum)));
    2264              : 
    2265              :     /* Get conninfo */
    2266          147 :     if (OidIsValid(form->subserver))
    2267              :     {
    2268              :         AclResult   aclresult;
    2269              :         ForeignServer *server;
    2270              : 
    2271            9 :         server = GetForeignServer(form->subserver);
    2272            9 :         aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
    2273              :                                     form->subowner, ACL_USAGE);
    2274            9 :         if (aclresult != ACLCHECK_OK)
    2275              :         {
    2276              :             /*
    2277              :              * Unable to generate connection string because permissions on the
    2278              :              * foreign server have been removed. Follow the same logic as an
    2279              :              * unusable subconninfo (which will result in an ERROR later
    2280              :              * unless slot_name = NONE).
    2281              :              */
    2282            8 :             err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
    2283              :                            GetUserNameFromId(form->subowner, false),
    2284              :                            server->servername);
    2285            8 :             conninfo = NULL;
    2286              :         }
    2287              :         else
    2288            1 :             conninfo = ForeignServerConnectionString(form->subowner,
    2289              :                                                      server);
    2290              :     }
    2291              :     else
    2292              :     {
    2293          138 :         datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2294              :                                        Anum_pg_subscription_subconninfo);
    2295          138 :         conninfo = TextDatumGetCString(datum);
    2296              :     }
    2297              : 
    2298              :     /* Get slotname */
    2299          147 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
    2300              :                             Anum_pg_subscription_subslotname, &isnull);
    2301          147 :     if (!isnull)
    2302           86 :         slotname = pstrdup(NameStr(*DatumGetName(datum)));
    2303              :     else
    2304           61 :         slotname = NULL;
    2305              : 
    2306              :     /*
    2307              :      * Since dropping a replication slot is not transactional, the replication
    2308              :      * slot stays dropped even if the transaction rolls back.  So we cannot
    2309              :      * run DROP SUBSCRIPTION inside a transaction block if dropping the
    2310              :      * replication slot.  Also, in this case, we report a message for dropping
    2311              :      * the subscription to the cumulative stats system.
    2312              :      *
    2313              :      * XXX The command name should really be something like "DROP SUBSCRIPTION
    2314              :      * of a subscription that is associated with a replication slot", but we
    2315              :      * don't have the proper facilities for that.
    2316              :      */
    2317          147 :     if (slotname)
    2318           86 :         PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
    2319              : 
    2320          143 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    2321          143 :     EventTriggerSQLDropAddObject(&myself, true, true);
    2322              : 
    2323              :     /* Remove the tuple from catalog. */
    2324          143 :     CatalogTupleDelete(rel, &tup->t_self);
    2325              : 
    2326          143 :     ReleaseSysCache(tup);
    2327              : 
    2328              :     /*
    2329              :      * Stop all the subscription workers immediately.
    2330              :      *
    2331              :      * This is necessary if we are dropping the replication slot, so that the
    2332              :      * slot becomes accessible.
    2333              :      *
    2334              :      * It is also necessary if the subscription is disabled and was disabled
    2335              :      * in the same transaction.  Then the workers haven't seen the disabling
    2336              :      * yet and will still be running, leading to hangs later when we want to
    2337              :      * drop the replication origin.  If the subscription was disabled before
    2338              :      * this transaction, then there shouldn't be any workers left, so this
    2339              :      * won't make a difference.
    2340              :      *
    2341              :      * New workers won't be started because we hold an exclusive lock on the
    2342              :      * subscription till the end of the transaction.
    2343              :      */
    2344          143 :     subworkers = logicalrep_workers_find(subid, false, true);
    2345          225 :     foreach(lc, subworkers)
    2346              :     {
    2347           82 :         LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
    2348              : 
    2349           82 :         logicalrep_worker_stop(w->type, w->subid, w->relid);
    2350              :     }
    2351          143 :     list_free(subworkers);
    2352              : 
    2353              :     /*
    2354              :      * Remove the no-longer-useful entry in the launcher's table of apply
    2355              :      * worker start times.
    2356              :      *
    2357              :      * If this transaction rolls back, the launcher might restart a failed
    2358              :      * apply worker before wal_retrieve_retry_interval milliseconds have
    2359              :      * elapsed, but that's pretty harmless.
    2360              :      */
    2361          143 :     ApplyLauncherForgetWorkerStartTime(subid);
    2362              : 
    2363              :     /*
    2364              :      * Cleanup of tablesync replication origins.
    2365              :      *
    2366              :      * Any READY-state relations would already have dealt with clean-ups.
    2367              :      *
    2368              :      * Note that the state can't change because we have already stopped both
    2369              :      * the apply and tablesync workers and they can't restart because of
    2370              :      * exclusive lock on the subscription.
    2371              :      */
    2372          143 :     rstates = GetSubscriptionRelations(subid, true, false, true);
    2373          148 :     foreach(lc, rstates)
    2374              :     {
    2375            5 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2376            5 :         Oid         relid = rstate->relid;
    2377              : 
    2378              :         /* Only cleanup resources of tablesync workers */
    2379            5 :         if (!OidIsValid(relid))
    2380            0 :             continue;
    2381              : 
    2382              :         /*
    2383              :          * Drop the tablesync's origin tracking if exists.
    2384              :          *
    2385              :          * It is possible that the origin is not yet created for tablesync
    2386              :          * worker so passing missing_ok = true. This can happen for the states
    2387              :          * before SUBREL_STATE_DATASYNC.
    2388              :          */
    2389            5 :         ReplicationOriginNameForLogicalRep(subid, relid, originname,
    2390              :                                            sizeof(originname));
    2391            5 :         replorigin_drop_by_name(originname, true, false);
    2392              :     }
    2393              : 
    2394              :     /* Clean up dependencies */
    2395          143 :     deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
    2396          143 :     deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
    2397              : 
    2398              :     /* Remove any associated relation synchronization states. */
    2399          143 :     RemoveSubscriptionRel(subid, InvalidOid);
    2400              : 
    2401              :     /* Remove the origin tracking if exists. */
    2402          143 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
    2403          143 :     replorigin_drop_by_name(originname, true, false);
    2404              : 
    2405              :     /*
    2406              :      * Tell the cumulative stats system that the subscription is getting
    2407              :      * dropped.
    2408              :      */
    2409          143 :     pgstat_drop_subscription(subid);
    2410              : 
    2411              :     /*
    2412              :      * If there is no slot associated with the subscription, we can finish
    2413              :      * here.
    2414              :      */
    2415          143 :     if (!slotname && rstates == NIL)
    2416              :     {
    2417           61 :         table_close(rel, NoLock);
    2418           61 :         return;
    2419              :     }
    2420              : 
    2421              :     /*
    2422              :      * Try to acquire the connection necessary for dropping slots.
    2423              :      *
    2424              :      * Note: If the slotname is NONE/NULL then we allow the command to finish
    2425              :      * and users need to manually cleanup the apply and tablesync worker slots
    2426              :      * later.
    2427              :      *
    2428              :      * This has to be at the end because otherwise if there is an error while
    2429              :      * doing the database operations we won't be able to rollback dropped
    2430              :      * slot.
    2431              :      */
    2432           82 :     load_file("libpqwalreceiver", false);
    2433              : 
    2434           82 :     if (conninfo)
    2435           78 :         wrconn = walrcv_connect(conninfo, true, true, must_use_password,
    2436              :                                 subname, &err);
    2437              : 
    2438           82 :     if (wrconn == NULL)
    2439              :     {
    2440            4 :         if (!slotname)
    2441              :         {
    2442              :             /* be tidy */
    2443            0 :             list_free(rstates);
    2444            0 :             table_close(rel, NoLock);
    2445            0 :             return;
    2446              :         }
    2447              :         else
    2448              :         {
    2449            4 :             ReportSlotConnectionError(rstates, subid, slotname, err);
    2450              :         }
    2451              :     }
    2452              : 
    2453           78 :     PG_TRY();
    2454              :     {
    2455           83 :         foreach(lc, rstates)
    2456              :         {
    2457            5 :             SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2458            5 :             Oid         relid = rstate->relid;
    2459              : 
    2460              :             /* Only cleanup resources of tablesync workers */
    2461            5 :             if (!OidIsValid(relid))
    2462            0 :                 continue;
    2463              : 
    2464              :             /*
    2465              :              * Drop the tablesync slots associated with removed tables.
    2466              :              *
    2467              :              * For SYNCDONE/READY states, the tablesync slot is known to have
    2468              :              * already been dropped by the tablesync worker.
    2469              :              *
    2470              :              * For other states, there is no certainty, maybe the slot does
    2471              :              * not exist yet. Also, if we fail after removing some of the
    2472              :              * slots, next time, it will again try to drop already dropped
    2473              :              * slots and fail. For these reasons, we allow missing_ok = true
    2474              :              * for the drop.
    2475              :              */
    2476            5 :             if (rstate->state != SUBREL_STATE_SYNCDONE)
    2477              :             {
    2478            4 :                 char        syncslotname[NAMEDATALEN] = {0};
    2479              : 
    2480            4 :                 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    2481              :                                                 sizeof(syncslotname));
    2482            4 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    2483              :             }
    2484              :         }
    2485              : 
    2486           78 :         list_free(rstates);
    2487              : 
    2488              :         /*
    2489              :          * If there is a slot associated with the subscription, then drop the
    2490              :          * replication slot at the publisher.
    2491              :          */
    2492           78 :         if (slotname)
    2493           78 :             ReplicationSlotDropAtPubNode(wrconn, slotname, false);
    2494              :     }
    2495            1 :     PG_FINALLY();
    2496              :     {
    2497           78 :         walrcv_disconnect(wrconn);
    2498              :     }
    2499           78 :     PG_END_TRY();
    2500              : 
    2501           77 :     table_close(rel, NoLock);
    2502              : }
    2503              : 
    2504              : /*
    2505              :  * Drop the replication slot at the publisher node using the replication
    2506              :  * connection.
    2507              :  *
    2508              :  * missing_ok - if true then only issue a LOG message if the slot doesn't
    2509              :  * exist.
    2510              :  */
    2511              : void
    2512          290 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
    2513              : {
    2514              :     StringInfoData cmd;
    2515              : 
    2516              :     Assert(wrconn);
    2517              : 
    2518          290 :     load_file("libpqwalreceiver", false);
    2519              : 
    2520          290 :     initStringInfo(&cmd);
    2521          290 :     appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
    2522              : 
    2523          290 :     PG_TRY();
    2524              :     {
    2525              :         WalRcvExecResult *res;
    2526              : 
    2527          290 :         res = walrcv_exec(wrconn, cmd.data, 0, NULL);
    2528              : 
    2529          290 :         if (res->status == WALRCV_OK_COMMAND)
    2530              :         {
    2531              :             /* NOTICE. Success. */
    2532          289 :             ereport(NOTICE,
    2533              :                     (errmsg("dropped replication slot \"%s\" on publisher",
    2534              :                             slotname)));
    2535              :         }
    2536            1 :         else if (res->status == WALRCV_ERROR &&
    2537            0 :                  missing_ok &&
    2538            0 :                  res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
    2539              :         {
    2540              :             /* LOG. Error, but missing_ok = true. */
    2541            0 :             ereport(LOG,
    2542              :                     (errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2543              :                             slotname, res->err)));
    2544              :         }
    2545              :         else
    2546              :         {
    2547              :             /* ERROR. */
    2548            1 :             ereport(ERROR,
    2549              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    2550              :                      errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2551              :                             slotname, res->err)));
    2552              :         }
    2553              : 
    2554          289 :         walrcv_clear_result(res);
    2555              :     }
    2556            1 :     PG_FINALLY();
    2557              :     {
    2558          290 :         pfree(cmd.data);
    2559              :     }
    2560          290 :     PG_END_TRY();
    2561          289 : }
    2562              : 
    2563              : /*
    2564              :  * Internal workhorse for changing a subscription owner
    2565              :  */
    2566              : static void
    2567           11 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2568              : {
    2569              :     Form_pg_subscription form;
    2570              :     AclResult   aclresult;
    2571              : 
    2572           11 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2573              : 
    2574           11 :     if (form->subowner == newOwnerId)
    2575            2 :         return;
    2576              : 
    2577            9 :     if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
    2578            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2579            0 :                        NameStr(form->subname));
    2580              : 
    2581              :     /*
    2582              :      * Don't allow non-superuser modification of a subscription with
    2583              :      * password_required=false.
    2584              :      */
    2585            9 :     if (!form->subpasswordrequired && !superuser())
    2586            0 :         ereport(ERROR,
    2587              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2588              :                  errmsg("password_required=false is superuser-only"),
    2589              :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    2590              : 
    2591              :     /* Must be able to become new owner */
    2592            9 :     check_can_set_role(GetUserId(), newOwnerId);
    2593              : 
    2594              :     /*
    2595              :      * current owner must have CREATE on database
    2596              :      *
    2597              :      * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
    2598              :      * other object types behave differently (e.g. you can't give a table to a
    2599              :      * user who lacks CREATE privileges on a schema).
    2600              :      */
    2601            5 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
    2602              :                                 GetUserId(), ACL_CREATE);
    2603            5 :     if (aclresult != ACLCHECK_OK)
    2604            0 :         aclcheck_error(aclresult, OBJECT_DATABASE,
    2605            0 :                        get_database_name(MyDatabaseId));
    2606              : 
    2607              :     /*
    2608              :      * If the subscription uses a server, check that the new owner has USAGE
    2609              :      * privileges on the server and that a user mapping exists. Note: does not
    2610              :      * re-check the resulting connection string.
    2611              :      */
    2612            5 :     if (OidIsValid(form->subserver))
    2613              :     {
    2614            0 :         ForeignServer *server = GetForeignServer(form->subserver);
    2615              : 
    2616            0 :         aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, newOwnerId, ACL_USAGE);
    2617            0 :         if (aclresult != ACLCHECK_OK)
    2618            0 :             ereport(ERROR,
    2619              :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2620              :                     errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
    2621              :                            GetUserNameFromId(newOwnerId, false),
    2622              :                            server->servername));
    2623              : 
    2624              :         /* make sure a user mapping exists */
    2625            0 :         GetUserMapping(newOwnerId, server->serverid);
    2626              :     }
    2627              : 
    2628            5 :     form->subowner = newOwnerId;
    2629            5 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2630              : 
    2631              :     /* Update owner dependency reference */
    2632            5 :     changeDependencyOnOwner(SubscriptionRelationId,
    2633              :                             form->oid,
    2634              :                             newOwnerId);
    2635              : 
    2636            5 :     InvokeObjectPostAlterHook(SubscriptionRelationId,
    2637              :                               form->oid, 0);
    2638              : 
    2639              :     /* Wake up related background processes to handle this change quickly. */
    2640            5 :     ApplyLauncherWakeupAtCommit();
    2641            5 :     LogicalRepWorkersWakeupAtCommit(form->oid);
    2642              : }
    2643              : 
    2644              : /*
    2645              :  * Change subscription owner -- by name
    2646              :  */
    2647              : ObjectAddress
    2648           11 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
    2649              : {
    2650              :     Oid         subid;
    2651              :     HeapTuple   tup;
    2652              :     Relation    rel;
    2653              :     ObjectAddress address;
    2654              :     Form_pg_subscription form;
    2655              : 
    2656           11 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2657              : 
    2658           11 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2659              :                               CStringGetDatum(name));
    2660              : 
    2661           11 :     if (!HeapTupleIsValid(tup))
    2662            0 :         ereport(ERROR,
    2663              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2664              :                  errmsg("subscription \"%s\" does not exist", name)));
    2665              : 
    2666           11 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2667           11 :     subid = form->oid;
    2668              : 
    2669           11 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2670              : 
    2671            7 :     ObjectAddressSet(address, SubscriptionRelationId, subid);
    2672              : 
    2673            7 :     heap_freetuple(tup);
    2674              : 
    2675            7 :     table_close(rel, RowExclusiveLock);
    2676              : 
    2677            7 :     return address;
    2678              : }
    2679              : 
    2680              : /*
    2681              :  * Change subscription owner -- by OID
    2682              :  */
    2683              : void
    2684            0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
    2685              : {
    2686              :     HeapTuple   tup;
    2687              :     Relation    rel;
    2688              : 
    2689            0 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2690              : 
    2691            0 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
    2692              : 
    2693            0 :     if (!HeapTupleIsValid(tup))
    2694            0 :         ereport(ERROR,
    2695              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2696              :                  errmsg("subscription with OID %u does not exist", subid)));
    2697              : 
    2698            0 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2699              : 
    2700            0 :     heap_freetuple(tup);
    2701              : 
    2702            0 :     table_close(rel, RowExclusiveLock);
    2703            0 : }
    2704              : 
    2705              : /*
    2706              :  * Check and log a warning if the publisher has subscribed to the same table,
    2707              :  * its partition ancestors (if it's a partition), or its partition children (if
    2708              :  * it's a partitioned table), from some other publishers. This check is
    2709              :  * required in the following scenarios:
    2710              :  *
    2711              :  * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    2712              :  *    statements with "copy_data = true" and "origin = none":
    2713              :  *    - Warn the user that data with an origin might have been copied.
    2714              :  *    - This check is skipped for tables already added, as incremental sync via
    2715              :  *      WAL allows origin tracking. The list of such tables is in
    2716              :  *      subrel_local_oids.
    2717              :  *
    2718              :  * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    2719              :  *    statements with "retain_dead_tuples = true" and "origin = any", and for
    2720              :  *    ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
    2721              :  *    or when the publisher's status changes (e.g., due to a connection string
    2722              :  *    update):
    2723              :  *    - Warn the user that only conflict detection info for local changes on
    2724              :  *      the publisher is retained. Data from other origins may lack sufficient
    2725              :  *      details for reliable conflict detection.
    2726              :  *    - See comments atop worker.c for more details.
    2727              :  */
    2728              : static void
    2729          177 : check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
    2730              :                                  bool copydata, bool retain_dead_tuples,
    2731              :                                  char *origin, Oid *subrel_local_oids,
    2732              :                                  int subrel_count, char *subname)
    2733              : {
    2734              :     WalRcvExecResult *res;
    2735              :     StringInfoData cmd;
    2736              :     TupleTableSlot *slot;
    2737          177 :     Oid         tableRow[1] = {TEXTOID};
    2738          177 :     List       *publist = NIL;
    2739              :     int         i;
    2740              :     bool        check_rdt;
    2741              :     bool        check_table_sync;
    2742          354 :     bool        origin_none = origin &&
    2743          177 :         pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
    2744              : 
    2745              :     /*
    2746              :      * Enable retain_dead_tuples checks only when origin is set to 'any',
    2747              :      * since with origin='none' only local changes are replicated to the
    2748              :      * subscriber.
    2749              :      */
    2750          177 :     check_rdt = retain_dead_tuples && !origin_none;
    2751              : 
    2752              :     /*
    2753              :      * Enable table synchronization checks only when origin is 'none', to
    2754              :      * ensure that data from other origins is not inadvertently copied.
    2755              :      */
    2756          177 :     check_table_sync = copydata && origin_none;
    2757              : 
    2758              :     /* retain_dead_tuples and table sync checks occur separately */
    2759              :     Assert(!(check_rdt && check_table_sync));
    2760              : 
    2761              :     /* Return if no checks are required */
    2762          177 :     if (!check_rdt && !check_table_sync)
    2763          163 :         return;
    2764              : 
    2765           14 :     initStringInfo(&cmd);
    2766           14 :     appendStringInfoString(&cmd,
    2767              :                            "SELECT DISTINCT P.pubname AS pubname\n"
    2768              :                            "FROM pg_publication P,\n"
    2769              :                            "     LATERAL pg_get_publication_tables(P.pubname) GPT\n"
    2770              :                            "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
    2771              :                            "     GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
    2772              :                            "                   SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
    2773              :                            "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    2774              :                            "WHERE C.oid = GPT.relid AND P.pubname IN (");
    2775           14 :     GetPublicationsStr(publications, &cmd, true);
    2776           14 :     appendStringInfoString(&cmd, ")\n");
    2777              : 
    2778              :     /*
    2779              :      * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    2780              :      * subrel_local_oids contains the list of relation oids that are already
    2781              :      * present on the subscriber. This check should be skipped for these
    2782              :      * tables if checking for table sync scenario. However, when handling the
    2783              :      * retain_dead_tuples scenario, ensure all tables are checked, as some
    2784              :      * existing tables may now include changes from other origins due to newly
    2785              :      * created subscriptions on the publisher.
    2786              :      */
    2787           14 :     if (check_table_sync)
    2788              :     {
    2789           14 :         for (i = 0; i < subrel_count; i++)
    2790              :         {
    2791            4 :             Oid         relid = subrel_local_oids[i];
    2792            4 :             char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    2793            4 :             char       *tablename = get_rel_name(relid);
    2794            4 :             char       *schemaname_lit = quote_literal_cstr(schemaname);
    2795            4 :             char       *tablename_lit = quote_literal_cstr(tablename);
    2796              : 
    2797            4 :             appendStringInfo(&cmd, "AND NOT (N.nspname = %s AND C.relname = %s)\n",
    2798              :                              schemaname_lit, tablename_lit);
    2799              : 
    2800            4 :             pfree(schemaname_lit);
    2801            4 :             pfree(tablename_lit);
    2802              :         }
    2803              :     }
    2804              : 
    2805           14 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    2806           14 :     pfree(cmd.data);
    2807              : 
    2808           14 :     if (res->status != WALRCV_OK_TUPLES)
    2809            0 :         ereport(ERROR,
    2810              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2811              :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    2812              :                         res->err)));
    2813              : 
    2814              :     /* Process publications. */
    2815           14 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2816           19 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2817              :     {
    2818              :         char       *pubname;
    2819              :         bool        isnull;
    2820              : 
    2821            5 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2822              :         Assert(!isnull);
    2823              : 
    2824            5 :         ExecClearTuple(slot);
    2825            5 :         publist = list_append_unique(publist, makeString(pubname));
    2826              :     }
    2827              : 
    2828              :     /*
    2829              :      * Log a warning if the publisher has subscribed to the same table from
    2830              :      * some other publisher. We cannot know the origin of data during the
    2831              :      * initial sync. Data origins can be found only from the WAL by looking at
    2832              :      * the origin id.
    2833              :      *
    2834              :      * XXX: For simplicity, we don't check whether the table has any data or
    2835              :      * not. If the table doesn't have any data then we don't need to
    2836              :      * distinguish between data having origin and data not having origin so we
    2837              :      * can avoid logging a warning for table sync scenario.
    2838              :      */
    2839           14 :     if (publist)
    2840              :     {
    2841              :         StringInfoData pubnames;
    2842              : 
    2843              :         /* Prepare the list of publication(s) for warning message. */
    2844            5 :         initStringInfo(&pubnames);
    2845            5 :         GetPublicationsStr(publist, &pubnames, false);
    2846              : 
    2847            5 :         if (check_table_sync)
    2848            4 :             ereport(WARNING,
    2849              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2850              :                     errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    2851              :                            subname),
    2852              :                     errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    2853              :                                      "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    2854              :                                      list_length(publist), pubnames.data),
    2855              :                     errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
    2856              :         else
    2857            1 :             ereport(WARNING,
    2858              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2859              :                     errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
    2860              :                            subname),
    2861              :                     errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    2862              :                                      "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    2863              :                                      list_length(publist), pubnames.data),
    2864              :                     errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
    2865              :     }
    2866              : 
    2867           14 :     ExecDropSingleTupleTableSlot(slot);
    2868              : 
    2869           14 :     walrcv_clear_result(res);
    2870              : }
    2871              : 
    2872              : /*
    2873              :  * This function is similar to check_publications_origin_tables and serves
    2874              :  * same purpose for sequences.
    2875              :  */
    2876              : static void
    2877          166 : check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
    2878              :                                     bool copydata, char *origin,
    2879              :                                     Oid *subrel_local_oids, int subrel_count,
    2880              :                                     char *subname)
    2881              : {
    2882              :     WalRcvExecResult *res;
    2883              :     StringInfoData cmd;
    2884              :     TupleTableSlot *slot;
    2885          166 :     Oid         tableRow[1] = {TEXTOID};
    2886          166 :     List       *publist = NIL;
    2887              : 
    2888              :     /*
    2889              :      * Enable sequence synchronization checks only when origin is 'none' , to
    2890              :      * ensure that sequence data from other origins is not inadvertently
    2891              :      * copied. This check is necessary if the publisher is running PG19 or
    2892              :      * later, where logical replication sequence synchronization is supported.
    2893              :      */
    2894          176 :     if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
    2895           10 :         walrcv_server_version(wrconn) < 190000)
    2896          156 :         return;
    2897              : 
    2898           10 :     initStringInfo(&cmd);
    2899           10 :     appendStringInfoString(&cmd,
    2900              :                            "SELECT DISTINCT P.pubname AS pubname\n"
    2901              :                            "FROM pg_publication P,\n"
    2902              :                            "     LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
    2903              :                            "     JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
    2904              :                            "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    2905              :                            "WHERE C.oid = GPS.relid AND P.pubname IN (");
    2906              : 
    2907           10 :     GetPublicationsStr(publications, &cmd, true);
    2908           10 :     appendStringInfoString(&cmd, ")\n");
    2909              : 
    2910              :     /*
    2911              :      * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    2912              :      * subrel_local_oids contains the list of relations that are already
    2913              :      * present on the subscriber. This check should be skipped as these will
    2914              :      * not be re-synced.
    2915              :      */
    2916           10 :     for (int i = 0; i < subrel_count; i++)
    2917              :     {
    2918            0 :         Oid         relid = subrel_local_oids[i];
    2919            0 :         char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    2920            0 :         char       *seqname = get_rel_name(relid);
    2921            0 :         char       *schemaname_lit = quote_literal_cstr(schemaname);
    2922            0 :         char       *seqname_lit = quote_literal_cstr(seqname);
    2923              : 
    2924            0 :         appendStringInfo(&cmd,
    2925              :                          "AND NOT (N.nspname = %s AND C.relname = %s)\n",
    2926              :                          schemaname_lit, seqname_lit);
    2927              : 
    2928            0 :         pfree(schemaname_lit);
    2929            0 :         pfree(seqname_lit);
    2930              :     }
    2931              : 
    2932           10 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    2933           10 :     pfree(cmd.data);
    2934              : 
    2935           10 :     if (res->status != WALRCV_OK_TUPLES)
    2936            0 :         ereport(ERROR,
    2937              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2938              :                  errmsg("could not receive list of replicated sequences from the publisher: %s",
    2939              :                         res->err)));
    2940              : 
    2941              :     /* Process publications. */
    2942           10 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2943           10 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2944              :     {
    2945              :         char       *pubname;
    2946              :         bool        isnull;
    2947              : 
    2948            0 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2949              :         Assert(!isnull);
    2950              : 
    2951            0 :         ExecClearTuple(slot);
    2952            0 :         publist = list_append_unique(publist, makeString(pubname));
    2953              :     }
    2954              : 
    2955              :     /*
    2956              :      * Log a warning if the publisher has subscribed to the same sequence from
    2957              :      * some other publisher. We cannot know the origin of sequences data
    2958              :      * during the initial sync.
    2959              :      */
    2960           10 :     if (publist)
    2961              :     {
    2962              :         StringInfoData pubnames;
    2963              : 
    2964              :         /* Prepare the list of publication(s) for warning message. */
    2965            0 :         initStringInfo(&pubnames);
    2966            0 :         GetPublicationsStr(publist, &pubnames, false);
    2967              : 
    2968            0 :         ereport(WARNING,
    2969              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2970              :                 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    2971              :                        subname),
    2972              :                 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
    2973              :                                  "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
    2974              :                                  list_length(publist), pubnames.data),
    2975              :                 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
    2976              :     }
    2977              : 
    2978           10 :     ExecDropSingleTupleTableSlot(slot);
    2979              : 
    2980           10 :     walrcv_clear_result(res);
    2981              : }
    2982              : 
    2983              : /*
    2984              :  * Determine whether the retain_dead_tuples can be enabled based on the
    2985              :  * publisher's status.
    2986              :  *
    2987              :  * This option is disallowed if the publisher is running a version earlier
    2988              :  * than the PG19, or if the publisher is in recovery (i.e., it is a standby
    2989              :  * server).
    2990              :  *
    2991              :  * See comments atop worker.c for a detailed explanation.
    2992              :  */
    2993              : static void
    2994           12 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
    2995              : {
    2996              :     WalRcvExecResult *res;
    2997           12 :     Oid         RecoveryRow[1] = {BOOLOID};
    2998              :     TupleTableSlot *slot;
    2999              :     bool        isnull;
    3000              :     bool        remote_in_recovery;
    3001              : 
    3002           12 :     if (walrcv_server_version(wrconn) < 190000)
    3003            0 :         ereport(ERROR,
    3004              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3005              :                 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
    3006              : 
    3007           12 :     res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
    3008              : 
    3009           12 :     if (res->status != WALRCV_OK_TUPLES)
    3010            0 :         ereport(ERROR,
    3011              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    3012              :                  errmsg("could not obtain recovery progress from the publisher: %s",
    3013              :                         res->err)));
    3014              : 
    3015           12 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    3016           12 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    3017            0 :         elog(ERROR, "failed to fetch tuple for the recovery progress");
    3018              : 
    3019           12 :     remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
    3020              : 
    3021           12 :     if (remote_in_recovery)
    3022            0 :         ereport(ERROR,
    3023              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3024              :                 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery"));
    3025              : 
    3026           12 :     ExecDropSingleTupleTableSlot(slot);
    3027              : 
    3028           12 :     walrcv_clear_result(res);
    3029           12 : }
    3030              : 
    3031              : /*
    3032              :  * Check if the subscriber's configuration is adequate to enable the
    3033              :  * retain_dead_tuples option.
    3034              :  *
    3035              :  * Issue an ERROR if the wal_level does not support the use of replication
    3036              :  * slots when check_guc is set to true.
    3037              :  *
    3038              :  * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
    3039              :  * set to true. This is only to highlight the importance of enabling
    3040              :  * track_commit_timestamp instead of catching all the misconfigurations, as
    3041              :  * this setting can be adjusted after subscription creation. Without it, the
    3042              :  * apply worker will simply skip conflict detection.
    3043              :  *
    3044              :  * Issue a WARNING or NOTICE if the subscription is disabled and the retention
    3045              :  * is active. Do not raise an ERROR since users can only modify
    3046              :  * retain_dead_tuples for disabled subscriptions. And as long as the
    3047              :  * subscription is enabled promptly, it will not pose issues.
    3048              :  *
    3049              :  * Issue a NOTICE to inform users that max_retention_duration is
    3050              :  * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
    3051              :  * is not issued because setting max_retention_duration causes no harm,
    3052              :  * even when it is ineffective.
    3053              :  */
    3054              : void
    3055          288 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
    3056              :                            int elevel_for_sub_disabled,
    3057              :                            bool retain_dead_tuples, bool retention_active,
    3058              :                            bool max_retention_set)
    3059              : {
    3060              :     Assert(elevel_for_sub_disabled == NOTICE ||
    3061              :            elevel_for_sub_disabled == WARNING);
    3062              : 
    3063          288 :     if (retain_dead_tuples)
    3064              :     {
    3065           17 :         if (check_guc && wal_level < WAL_LEVEL_REPLICA)
    3066            0 :             ereport(ERROR,
    3067              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3068              :                     errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
    3069              :                     errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
    3070              : 
    3071           17 :         if (check_guc && !track_commit_timestamp)
    3072            4 :             ereport(WARNING,
    3073              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3074              :                     errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
    3075              :                     errhint("Consider setting \"%s\" to true.",
    3076              :                             "track_commit_timestamp"));
    3077              : 
    3078           17 :         if (sub_disabled && retention_active)
    3079            7 :             ereport(elevel_for_sub_disabled,
    3080              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3081              :                     errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
    3082              :                     (elevel_for_sub_disabled > NOTICE)
    3083              :                     ? errhint("Consider setting %s to false.",
    3084              :                               "retain_dead_tuples") : 0);
    3085              :     }
    3086          271 :     else if (max_retention_set)
    3087              :     {
    3088            4 :         ereport(NOTICE,
    3089              :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3090              :                 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
    3091              :     }
    3092          288 : }
    3093              : 
    3094              : /*
    3095              :  * Return true iff 'rv' is a member of the list.
    3096              :  */
    3097              : static bool
    3098          287 : list_member_rangevar(const List *list, RangeVar *rv)
    3099              : {
    3100         1044 :     foreach_ptr(PublicationRelKind, relinfo, list)
    3101              :     {
    3102          472 :         if (equal(relinfo->rv, rv))
    3103            1 :             return true;
    3104              :     }
    3105              : 
    3106          286 :     return false;
    3107              : }
    3108              : 
    3109              : /*
    3110              :  * Get the list of tables and sequences which belong to specified publications
    3111              :  * on the publisher connection.
    3112              :  *
    3113              :  * Note that we don't support the case where the column list is different for
    3114              :  * the same table in different publications to avoid sending unwanted column
    3115              :  * information for some of the rows. This can happen when both the column
    3116              :  * list and row filter are specified for different publications.
    3117              :  */
    3118              : static List *
    3119          164 : fetch_relation_list(WalReceiverConn *wrconn, List *publications)
    3120              : {
    3121              :     WalRcvExecResult *res;
    3122              :     StringInfoData cmd;
    3123              :     TupleTableSlot *slot;
    3124          164 :     Oid         tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
    3125          164 :     List       *relationlist = NIL;
    3126          164 :     int         server_version = walrcv_server_version(wrconn);
    3127          164 :     bool        check_columnlist = (server_version >= 150000);
    3128          164 :     int         column_count = check_columnlist ? 4 : 3;
    3129              :     StringInfoData pub_names;
    3130              : 
    3131          164 :     initStringInfo(&cmd);
    3132          164 :     initStringInfo(&pub_names);
    3133              : 
    3134              :     /* Build the pub_names comma-separated string. */
    3135          164 :     GetPublicationsStr(publications, &pub_names, true);
    3136              : 
    3137              :     /* Get the list of relations from the publisher */
    3138          164 :     if (server_version >= 160000)
    3139              :     {
    3140          164 :         tableRow[3] = INT2VECTOROID;
    3141              : 
    3142              :         /*
    3143              :          * From version 16, we allowed passing multiple publications to the
    3144              :          * function pg_get_publication_tables. This helped to filter out the
    3145              :          * partition table whose ancestor is also published in this
    3146              :          * publication array.
    3147              :          *
    3148              :          * Join pg_get_publication_tables with pg_publication to exclude
    3149              :          * non-existing publications.
    3150              :          *
    3151              :          * Note that attrs are always stored in sorted order so we don't need
    3152              :          * to worry if different publications have specified them in a
    3153              :          * different order. See pub_collist_validate.
    3154              :          */
    3155          164 :         appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
    3156              :                          "   FROM pg_class c\n"
    3157              :                          "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
    3158              :                          "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
    3159              :                          "                FROM pg_publication\n"
    3160              :                          "                WHERE pubname IN ( %s )) AS gpt\n"
    3161              :                          "             ON gpt.relid = c.oid\n",
    3162              :                          pub_names.data);
    3163              : 
    3164              :         /* From version 19, inclusion of sequences in the target is supported */
    3165          164 :         if (server_version >= 190000)
    3166          164 :             appendStringInfo(&cmd,
    3167              :                              "UNION ALL\n"
    3168              :                              "  SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
    3169              :                              "  FROM pg_catalog.pg_publication_sequences s\n"
    3170              :                              "  WHERE s.pubname IN ( %s )",
    3171              :                              pub_names.data);
    3172              :     }
    3173              :     else
    3174              :     {
    3175            0 :         tableRow[3] = NAMEARRAYOID;
    3176            0 :         appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
    3177              : 
    3178              :         /* Get column lists for each relation if the publisher supports it */
    3179            0 :         if (check_columnlist)
    3180            0 :             appendStringInfoString(&cmd, ", t.attnames\n");
    3181              : 
    3182            0 :         appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
    3183              :                          " WHERE t.pubname IN ( %s )",
    3184              :                          pub_names.data);
    3185              :     }
    3186              : 
    3187          164 :     pfree(pub_names.data);
    3188              : 
    3189          164 :     res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
    3190          164 :     pfree(cmd.data);
    3191              : 
    3192          164 :     if (res->status != WALRCV_OK_TUPLES)
    3193            0 :         ereport(ERROR,
    3194              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    3195              :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    3196              :                         res->err)));
    3197              : 
    3198              :     /* Process tables. */
    3199          164 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    3200          467 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    3201              :     {
    3202              :         char       *nspname;
    3203              :         char       *relname;
    3204              :         bool        isnull;
    3205              :         char        relkind;
    3206          304 :         PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
    3207              : 
    3208          304 :         nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    3209              :         Assert(!isnull);
    3210          304 :         relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
    3211              :         Assert(!isnull);
    3212          304 :         relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
    3213              :         Assert(!isnull);
    3214              : 
    3215          304 :         relinfo->rv = makeRangeVar(nspname, relname, -1);
    3216          304 :         relinfo->relkind = relkind;
    3217              : 
    3218          304 :         if (relkind != RELKIND_SEQUENCE &&
    3219          287 :             check_columnlist &&
    3220          287 :             list_member_rangevar(relationlist, relinfo->rv))
    3221            1 :             ereport(ERROR,
    3222              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3223              :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    3224              :                            nspname, relname));
    3225              :         else
    3226          303 :             relationlist = lappend(relationlist, relinfo);
    3227              : 
    3228          303 :         ExecClearTuple(slot);
    3229              :     }
    3230          163 :     ExecDropSingleTupleTableSlot(slot);
    3231              : 
    3232          163 :     walrcv_clear_result(res);
    3233              : 
    3234          163 :     return relationlist;
    3235              : }
    3236              : 
    3237              : /*
    3238              :  * This is to report the connection failure while dropping replication slots.
    3239              :  * Here, we report the WARNING for all tablesync slots so that user can drop
    3240              :  * them manually, if required.
    3241              :  */
    3242              : static void
    3243            4 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
    3244              : {
    3245              :     ListCell   *lc;
    3246              : 
    3247            4 :     foreach(lc, rstates)
    3248              :     {
    3249            0 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    3250            0 :         Oid         relid = rstate->relid;
    3251              : 
    3252              :         /* Only cleanup resources of tablesync workers */
    3253            0 :         if (!OidIsValid(relid))
    3254            0 :             continue;
    3255              : 
    3256              :         /*
    3257              :          * Caller needs to ensure that relstate doesn't change underneath us.
    3258              :          * See DropSubscription where we get the relstates.
    3259              :          */
    3260            0 :         if (rstate->state != SUBREL_STATE_SYNCDONE)
    3261              :         {
    3262            0 :             char        syncslotname[NAMEDATALEN] = {0};
    3263              : 
    3264            0 :             ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    3265              :                                             sizeof(syncslotname));
    3266            0 :             elog(WARNING, "could not drop tablesync replication slot \"%s\"",
    3267              :                  syncslotname);
    3268              :         }
    3269              :     }
    3270              : 
    3271            4 :     ereport(ERROR,
    3272              :             (errcode(ERRCODE_CONNECTION_FAILURE),
    3273              :              errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
    3274              :                     slotname, err),
    3275              :     /* translator: %s is an SQL ALTER command */
    3276              :              errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
    3277              :                      "ALTER SUBSCRIPTION ... DISABLE",
    3278              :                      "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
    3279              : }
    3280              : 
    3281              : /*
    3282              :  * Check for duplicates in the given list of publications and error out if
    3283              :  * found one.  Add publications to datums as text datums, if datums is not
    3284              :  * NULL.
    3285              :  */
    3286              : static void
    3287          264 : check_duplicates_in_publist(List *publist, Datum *datums)
    3288              : {
    3289              :     ListCell   *cell;
    3290          264 :     int         j = 0;
    3291              : 
    3292          602 :     foreach(cell, publist)
    3293              :     {
    3294          350 :         char       *name = strVal(lfirst(cell));
    3295              :         ListCell   *pcell;
    3296              : 
    3297          513 :         foreach(pcell, publist)
    3298              :         {
    3299          513 :             char       *pname = strVal(lfirst(pcell));
    3300              : 
    3301          513 :             if (pcell == cell)
    3302          338 :                 break;
    3303              : 
    3304          175 :             if (strcmp(name, pname) == 0)
    3305           12 :                 ereport(ERROR,
    3306              :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    3307              :                          errmsg("publication name \"%s\" used more than once",
    3308              :                                 pname)));
    3309              :         }
    3310              : 
    3311          338 :         if (datums)
    3312          282 :             datums[j++] = CStringGetTextDatum(name);
    3313              :     }
    3314          252 : }
    3315              : 
    3316              : /*
    3317              :  * Merge current subscription's publications and user-specified publications
    3318              :  * from ADD/DROP PUBLICATIONS.
    3319              :  *
    3320              :  * If addpub is true, we will add the list of publications into oldpublist.
    3321              :  * Otherwise, we will delete the list of publications from oldpublist.  The
    3322              :  * returned list is a copy, oldpublist itself is not changed.
    3323              :  *
    3324              :  * subname is the subscription name, for error messages.
    3325              :  */
    3326              : static List *
    3327           35 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
    3328              : {
    3329              :     ListCell   *lc;
    3330              : 
    3331           35 :     oldpublist = list_copy(oldpublist);
    3332              : 
    3333           35 :     check_duplicates_in_publist(newpublist, NULL);
    3334              : 
    3335           59 :     foreach(lc, newpublist)
    3336              :     {
    3337           44 :         char       *name = strVal(lfirst(lc));
    3338              :         ListCell   *lc2;
    3339           44 :         bool        found = false;
    3340              : 
    3341           86 :         foreach(lc2, oldpublist)
    3342              :         {
    3343           71 :             char       *pubname = strVal(lfirst(lc2));
    3344              : 
    3345           71 :             if (strcmp(name, pubname) == 0)
    3346              :             {
    3347           29 :                 found = true;
    3348           29 :                 if (addpub)
    3349            8 :                     ereport(ERROR,
    3350              :                             (errcode(ERRCODE_DUPLICATE_OBJECT),
    3351              :                              errmsg("publication \"%s\" is already in subscription \"%s\"",
    3352              :                                     name, subname)));
    3353              :                 else
    3354           21 :                     oldpublist = foreach_delete_current(oldpublist, lc2);
    3355              : 
    3356           21 :                 break;
    3357              :             }
    3358              :         }
    3359              : 
    3360           36 :         if (addpub && !found)
    3361           11 :             oldpublist = lappend(oldpublist, makeString(name));
    3362           25 :         else if (!addpub && !found)
    3363            4 :             ereport(ERROR,
    3364              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3365              :                      errmsg("publication \"%s\" is not in subscription \"%s\"",
    3366              :                             name, subname)));
    3367              :     }
    3368              : 
    3369              :     /*
    3370              :      * XXX Probably no strong reason for this, but for now it's to make ALTER
    3371              :      * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
    3372              :      */
    3373           15 :     if (!oldpublist)
    3374            4 :         ereport(ERROR,
    3375              :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3376              :                  errmsg("cannot drop all the publications from a subscription")));
    3377              : 
    3378           11 :     return oldpublist;
    3379              : }
    3380              : 
    3381              : /*
    3382              :  * Extract the streaming mode value from a DefElem.  This is like
    3383              :  * defGetBoolean() but also accepts the special value of "parallel".
    3384              :  */
    3385              : char
    3386          485 : defGetStreamingMode(DefElem *def)
    3387              : {
    3388              :     /*
    3389              :      * If no parameter value given, assume "true" is meant.
    3390              :      */
    3391          485 :     if (!def->arg)
    3392            0 :         return LOGICALREP_STREAM_ON;
    3393              : 
    3394              :     /*
    3395              :      * Allow 0, 1, "false", "true", "off", "on" or "parallel".
    3396              :      */
    3397          485 :     switch (nodeTag(def->arg))
    3398              :     {
    3399            0 :         case T_Integer:
    3400            0 :             switch (intVal(def->arg))
    3401              :             {
    3402            0 :                 case 0:
    3403            0 :                     return LOGICALREP_STREAM_OFF;
    3404            0 :                 case 1:
    3405            0 :                     return LOGICALREP_STREAM_ON;
    3406            0 :                 default:
    3407              :                     /* otherwise, error out below */
    3408            0 :                     break;
    3409              :             }
    3410            0 :             break;
    3411          485 :         default:
    3412              :             {
    3413          485 :                 char       *sval = defGetString(def);
    3414              : 
    3415              :                 /*
    3416              :                  * The set of strings accepted here should match up with the
    3417              :                  * grammar's opt_boolean_or_string production.
    3418              :                  */
    3419          966 :                 if (pg_strcasecmp(sval, "false") == 0 ||
    3420          481 :                     pg_strcasecmp(sval, "off") == 0)
    3421            7 :                     return LOGICALREP_STREAM_OFF;
    3422          944 :                 if (pg_strcasecmp(sval, "true") == 0 ||
    3423          466 :                     pg_strcasecmp(sval, "on") == 0)
    3424           49 :                     return LOGICALREP_STREAM_ON;
    3425          429 :                 if (pg_strcasecmp(sval, "parallel") == 0)
    3426          425 :                     return LOGICALREP_STREAM_PARALLEL;
    3427              :             }
    3428            4 :             break;
    3429              :     }
    3430              : 
    3431            4 :     ereport(ERROR,
    3432              :             (errcode(ERRCODE_SYNTAX_ERROR),
    3433              :              errmsg("%s requires a Boolean value or \"parallel\"",
    3434              :                     def->defname)));
    3435              :     return LOGICALREP_STREAM_OFF;   /* keep compiler quiet */
    3436              : }
        

Generated by: LCOV version 2.0-1