|           Line data    Source code 
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * subscriptioncmds.c
       4             :  *      subscription catalog manipulation functions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/commands/subscriptioncmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/commit_ts.h"
      18             : #include "access/htup_details.h"
      19             : #include "access/table.h"
      20             : #include "access/twophase.h"
      21             : #include "access/xact.h"
      22             : #include "catalog/catalog.h"
      23             : #include "catalog/dependency.h"
      24             : #include "catalog/indexing.h"
      25             : #include "catalog/namespace.h"
      26             : #include "catalog/objectaccess.h"
      27             : #include "catalog/objectaddress.h"
      28             : #include "catalog/pg_authid_d.h"
      29             : #include "catalog/pg_database_d.h"
      30             : #include "catalog/pg_subscription.h"
      31             : #include "catalog/pg_subscription_rel.h"
      32             : #include "catalog/pg_type.h"
      33             : #include "commands/defrem.h"
      34             : #include "commands/event_trigger.h"
      35             : #include "commands/subscriptioncmds.h"
      36             : #include "executor/executor.h"
      37             : #include "miscadmin.h"
      38             : #include "nodes/makefuncs.h"
      39             : #include "pgstat.h"
      40             : #include "replication/logicallauncher.h"
      41             : #include "replication/logicalworker.h"
      42             : #include "replication/origin.h"
      43             : #include "replication/slot.h"
      44             : #include "replication/walreceiver.h"
      45             : #include "replication/walsender.h"
      46             : #include "replication/worker_internal.h"
      47             : #include "storage/lmgr.h"
      48             : #include "utils/acl.h"
      49             : #include "utils/builtins.h"
      50             : #include "utils/guc.h"
      51             : #include "utils/lsyscache.h"
      52             : #include "utils/memutils.h"
      53             : #include "utils/pg_lsn.h"
      54             : #include "utils/syscache.h"
      55             : 
      56             : /*
      57             :  * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
      58             :  * command.
      59             :  */
      60             : #define SUBOPT_CONNECT              0x00000001
      61             : #define SUBOPT_ENABLED              0x00000002
      62             : #define SUBOPT_CREATE_SLOT          0x00000004
      63             : #define SUBOPT_SLOT_NAME            0x00000008
      64             : #define SUBOPT_COPY_DATA            0x00000010
      65             : #define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020
      66             : #define SUBOPT_REFRESH              0x00000040
      67             : #define SUBOPT_BINARY               0x00000080
      68             : #define SUBOPT_STREAMING            0x00000100
      69             : #define SUBOPT_TWOPHASE_COMMIT      0x00000200
      70             : #define SUBOPT_DISABLE_ON_ERR       0x00000400
      71             : #define SUBOPT_PASSWORD_REQUIRED    0x00000800
      72             : #define SUBOPT_RUN_AS_OWNER         0x00001000
      73             : #define SUBOPT_FAILOVER             0x00002000
      74             : #define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000
      75             : #define SUBOPT_MAX_RETENTION_DURATION   0x00008000
      76             : #define SUBOPT_LSN                  0x00010000
      77             : #define SUBOPT_ORIGIN               0x00020000
      78             : 
      79             : /* check if the 'val' has 'bits' set */
      80             : #define IsSet(val, bits)  (((val) & (bits)) == (bits))
      81             : 
      82             : /*
      83             :  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
      84             :  * SUBSCRIPTION command options and the parsed/default values of each of them.
      85             :  */
      86             : typedef struct SubOpts
      87             : {
      88             :     bits32      specified_opts;
      89             :     char       *slot_name;
      90             :     char       *synchronous_commit;
      91             :     bool        connect;
      92             :     bool        enabled;
      93             :     bool        create_slot;
      94             :     bool        copy_data;
      95             :     bool        refresh;
      96             :     bool        binary;
      97             :     char        streaming;
      98             :     bool        twophase;
      99             :     bool        disableonerr;
     100             :     bool        passwordrequired;
     101             :     bool        runasowner;
     102             :     bool        failover;
     103             :     bool        retaindeadtuples;
     104             :     int32       maxretention;
     105             :     char       *origin;
     106             :     XLogRecPtr  lsn;
     107             : } SubOpts;
     108             : 
     109             : /*
     110             :  * PublicationRelKind represents a relation included in a publication.
     111             :  * It stores the schema-qualified relation name (rv) and its kind (relkind).
     112             :  */
     113             : typedef struct PublicationRelKind
     114             : {
     115             :     RangeVar   *rv;
     116             :     char        relkind;
     117             : } PublicationRelKind;
     118             : 
     119             : static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
     120             : static void check_publications_origin_tables(WalReceiverConn *wrconn,
     121             :                                              List *publications, bool copydata,
     122             :                                              bool retain_dead_tuples,
     123             :                                              char *origin,
     124             :                                              Oid *subrel_local_oids,
     125             :                                              int subrel_count, char *subname);
     126             : static void check_publications_origin_sequences(WalReceiverConn *wrconn,
     127             :                                                 List *publications,
     128             :                                                 bool copydata, char *origin,
     129             :                                                 Oid *subrel_local_oids,
     130             :                                                 int subrel_count,
     131             :                                                 char *subname);
     132             : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
     133             : static void check_duplicates_in_publist(List *publist, Datum *datums);
     134             : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
     135             : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
     136             : static void CheckAlterSubOption(Subscription *sub, const char *option,
     137             :                                 bool slot_needs_update, bool isTopLevel);
     138             : 
     139             : 
     140             : /*
     141             :  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
     142             :  *
     143             :  * Since not all options can be specified in both commands, this function
     144             :  * will report an error if mutually exclusive options are specified.
     145             :  */
     146             : static void
     147         970 : parse_subscription_options(ParseState *pstate, List *stmt_options,
     148             :                            bits32 supported_opts, SubOpts *opts)
     149             : {
     150             :     ListCell   *lc;
     151             : 
     152             :     /* Start out with cleared opts. */
     153         970 :     memset(opts, 0, sizeof(SubOpts));
     154             : 
     155             :     /* caller must expect some option */
     156             :     Assert(supported_opts != 0);
     157             : 
     158             :     /* If connect option is supported, these others also need to be. */
     159             :     Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
     160             :            IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     161             :                  SUBOPT_COPY_DATA));
     162             : 
     163             :     /* Set default values for the supported options. */
     164         970 :     if (IsSet(supported_opts, SUBOPT_CONNECT))
     165         482 :         opts->connect = true;
     166         970 :     if (IsSet(supported_opts, SUBOPT_ENABLED))
     167         586 :         opts->enabled = true;
     168         970 :     if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
     169         482 :         opts->create_slot = true;
     170         970 :     if (IsSet(supported_opts, SUBOPT_COPY_DATA))
     171         624 :         opts->copy_data = true;
     172         970 :     if (IsSet(supported_opts, SUBOPT_REFRESH))
     173          86 :         opts->refresh = true;
     174         970 :     if (IsSet(supported_opts, SUBOPT_BINARY))
     175         700 :         opts->binary = false;
     176         970 :     if (IsSet(supported_opts, SUBOPT_STREAMING))
     177         700 :         opts->streaming = LOGICALREP_STREAM_PARALLEL;
     178         970 :     if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
     179         700 :         opts->twophase = false;
     180         970 :     if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
     181         700 :         opts->disableonerr = false;
     182         970 :     if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
     183         700 :         opts->passwordrequired = true;
     184         970 :     if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
     185         700 :         opts->runasowner = false;
     186         970 :     if (IsSet(supported_opts, SUBOPT_FAILOVER))
     187         700 :         opts->failover = false;
     188         970 :     if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     189         700 :         opts->retaindeadtuples = false;
     190         970 :     if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
     191         700 :         opts->maxretention = 0;
     192         970 :     if (IsSet(supported_opts, SUBOPT_ORIGIN))
     193         700 :         opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
     194             : 
     195             :     /* Parse options */
     196        1920 :     foreach(lc, stmt_options)
     197             :     {
     198        1016 :         DefElem    *defel = (DefElem *) lfirst(lc);
     199             : 
     200        1016 :         if (IsSet(supported_opts, SUBOPT_CONNECT) &&
     201         592 :             strcmp(defel->defname, "connect") == 0)
     202             :         {
     203         190 :             if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
     204           0 :                 errorConflictingDefElem(defel, pstate);
     205             : 
     206         190 :             opts->specified_opts |= SUBOPT_CONNECT;
     207         190 :             opts->connect = defGetBoolean(defel);
     208             :         }
     209         826 :         else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
     210         506 :                  strcmp(defel->defname, "enabled") == 0)
     211             :         {
     212         142 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     213           0 :                 errorConflictingDefElem(defel, pstate);
     214             : 
     215         142 :             opts->specified_opts |= SUBOPT_ENABLED;
     216         142 :             opts->enabled = defGetBoolean(defel);
     217             :         }
     218         684 :         else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
     219         364 :                  strcmp(defel->defname, "create_slot") == 0)
     220             :         {
     221          40 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     222           0 :                 errorConflictingDefElem(defel, pstate);
     223             : 
     224          40 :             opts->specified_opts |= SUBOPT_CREATE_SLOT;
     225          40 :             opts->create_slot = defGetBoolean(defel);
     226             :         }
     227         644 :         else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
     228         546 :                  strcmp(defel->defname, "slot_name") == 0)
     229             :         {
     230         156 :             if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     231           0 :                 errorConflictingDefElem(defel, pstate);
     232             : 
     233         156 :             opts->specified_opts |= SUBOPT_SLOT_NAME;
     234         156 :             opts->slot_name = defGetString(defel);
     235             : 
     236             :             /* Setting slot_name = NONE is treated as no slot name. */
     237         156 :             if (strcmp(opts->slot_name, "none") == 0)
     238         122 :                 opts->slot_name = NULL;
     239             :             else
     240          34 :                 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
     241             :         }
     242         488 :         else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
     243         320 :                  strcmp(defel->defname, "copy_data") == 0)
     244             :         {
     245          56 :             if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     246           0 :                 errorConflictingDefElem(defel, pstate);
     247             : 
     248          56 :             opts->specified_opts |= SUBOPT_COPY_DATA;
     249          56 :             opts->copy_data = defGetBoolean(defel);
     250             :         }
     251         432 :         else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
     252         342 :                  strcmp(defel->defname, "synchronous_commit") == 0)
     253             :         {
     254          12 :             if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
     255           0 :                 errorConflictingDefElem(defel, pstate);
     256             : 
     257          12 :             opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
     258          12 :             opts->synchronous_commit = defGetString(defel);
     259             : 
     260             :             /* Test if the given value is valid for synchronous_commit GUC. */
     261          12 :             (void) set_config_option("synchronous_commit", opts->synchronous_commit,
     262             :                                      PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     263             :                                      false, 0, false);
     264             :         }
     265         420 :         else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
     266          66 :                  strcmp(defel->defname, "refresh") == 0)
     267             :         {
     268          66 :             if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
     269           0 :                 errorConflictingDefElem(defel, pstate);
     270             : 
     271          66 :             opts->specified_opts |= SUBOPT_REFRESH;
     272          66 :             opts->refresh = defGetBoolean(defel);
     273             :         }
     274         354 :         else if (IsSet(supported_opts, SUBOPT_BINARY) &&
     275         330 :                  strcmp(defel->defname, "binary") == 0)
     276             :         {
     277          32 :             if (IsSet(opts->specified_opts, SUBOPT_BINARY))
     278           0 :                 errorConflictingDefElem(defel, pstate);
     279             : 
     280          32 :             opts->specified_opts |= SUBOPT_BINARY;
     281          32 :             opts->binary = defGetBoolean(defel);
     282             :         }
     283         322 :         else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
     284         298 :                  strcmp(defel->defname, "streaming") == 0)
     285             :         {
     286          74 :             if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
     287           0 :                 errorConflictingDefElem(defel, pstate);
     288             : 
     289          74 :             opts->specified_opts |= SUBOPT_STREAMING;
     290          74 :             opts->streaming = defGetStreamingMode(defel);
     291             :         }
     292         248 :         else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
     293         224 :                  strcmp(defel->defname, "two_phase") == 0)
     294             :         {
     295          42 :             if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
     296           0 :                 errorConflictingDefElem(defel, pstate);
     297             : 
     298          42 :             opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
     299          42 :             opts->twophase = defGetBoolean(defel);
     300             :         }
     301         206 :         else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
     302         182 :                  strcmp(defel->defname, "disable_on_error") == 0)
     303             :         {
     304          20 :             if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
     305           0 :                 errorConflictingDefElem(defel, pstate);
     306             : 
     307          20 :             opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
     308          20 :             opts->disableonerr = defGetBoolean(defel);
     309             :         }
     310         186 :         else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
     311         162 :                  strcmp(defel->defname, "password_required") == 0)
     312             :         {
     313          24 :             if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
     314           0 :                 errorConflictingDefElem(defel, pstate);
     315             : 
     316          24 :             opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
     317          24 :             opts->passwordrequired = defGetBoolean(defel);
     318             :         }
     319         162 :         else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
     320         138 :                  strcmp(defel->defname, "run_as_owner") == 0)
     321             :         {
     322          18 :             if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
     323           0 :                 errorConflictingDefElem(defel, pstate);
     324             : 
     325          18 :             opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
     326          18 :             opts->runasowner = defGetBoolean(defel);
     327             :         }
     328         144 :         else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
     329         120 :                  strcmp(defel->defname, "failover") == 0)
     330             :         {
     331          26 :             if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
     332           0 :                 errorConflictingDefElem(defel, pstate);
     333             : 
     334          26 :             opts->specified_opts |= SUBOPT_FAILOVER;
     335          26 :             opts->failover = defGetBoolean(defel);
     336             :         }
     337         118 :         else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
     338          94 :                  strcmp(defel->defname, "retain_dead_tuples") == 0)
     339             :         {
     340          24 :             if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     341           0 :                 errorConflictingDefElem(defel, pstate);
     342             : 
     343          24 :             opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
     344          24 :             opts->retaindeadtuples = defGetBoolean(defel);
     345             :         }
     346          94 :         else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
     347          70 :                  strcmp(defel->defname, "max_retention_duration") == 0)
     348             :         {
     349          22 :             if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
     350           0 :                 errorConflictingDefElem(defel, pstate);
     351             : 
     352          22 :             opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
     353          22 :             opts->maxretention = defGetInt32(defel);
     354             :         }
     355          72 :         else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
     356          48 :                  strcmp(defel->defname, "origin") == 0)
     357             :         {
     358          42 :             if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
     359           0 :                 errorConflictingDefElem(defel, pstate);
     360             : 
     361          42 :             opts->specified_opts |= SUBOPT_ORIGIN;
     362          42 :             pfree(opts->origin);
     363             : 
     364             :             /*
     365             :              * Even though the "origin" parameter allows only "none" and "any"
     366             :              * values, it is implemented as a string type so that the
     367             :              * parameter can be extended in future versions to support
     368             :              * filtering using origin names specified by the user.
     369             :              */
     370          42 :             opts->origin = defGetString(defel);
     371             : 
     372          58 :             if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
     373          16 :                 (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
     374           6 :                 ereport(ERROR,
     375             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     376             :                         errmsg("unrecognized origin value: \"%s\"", opts->origin));
     377             :         }
     378          30 :         else if (IsSet(supported_opts, SUBOPT_LSN) &&
     379          24 :                  strcmp(defel->defname, "lsn") == 0)
     380          18 :         {
     381          24 :             char       *lsn_str = defGetString(defel);
     382             :             XLogRecPtr  lsn;
     383             : 
     384          24 :             if (IsSet(opts->specified_opts, SUBOPT_LSN))
     385           0 :                 errorConflictingDefElem(defel, pstate);
     386             : 
     387             :             /* Setting lsn = NONE is treated as resetting LSN */
     388          24 :             if (strcmp(lsn_str, "none") == 0)
     389           6 :                 lsn = InvalidXLogRecPtr;
     390             :             else
     391             :             {
     392             :                 /* Parse the argument as LSN */
     393          18 :                 lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
     394             :                                                       CStringGetDatum(lsn_str)));
     395             : 
     396          18 :                 if (XLogRecPtrIsInvalid(lsn))
     397           6 :                     ereport(ERROR,
     398             :                             (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     399             :                              errmsg("invalid WAL location (LSN): %s", lsn_str)));
     400             :             }
     401             : 
     402          18 :             opts->specified_opts |= SUBOPT_LSN;
     403          18 :             opts->lsn = lsn;
     404             :         }
     405             :         else
     406           6 :             ereport(ERROR,
     407             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     408             :                      errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
     409             :     }
     410             : 
     411             :     /*
     412             :      * We've been explicitly asked to not connect, that requires some
     413             :      * additional processing.
     414             :      */
     415         904 :     if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
     416             :     {
     417             :         /* Check for incompatible options from the user. */
     418         148 :         if (opts->enabled &&
     419         148 :             IsSet(opts->specified_opts, SUBOPT_ENABLED))
     420           6 :             ereport(ERROR,
     421             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     422             :             /*- translator: both %s are strings of the form "option = value" */
     423             :                      errmsg("%s and %s are mutually exclusive options",
     424             :                             "connect = false", "enabled = true")));
     425             : 
     426         142 :         if (opts->create_slot &&
     427         136 :             IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     428           6 :             ereport(ERROR,
     429             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     430             :                      errmsg("%s and %s are mutually exclusive options",
     431             :                             "connect = false", "create_slot = true")));
     432             : 
     433         136 :         if (opts->copy_data &&
     434         130 :             IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     435           6 :             ereport(ERROR,
     436             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     437             :                      errmsg("%s and %s are mutually exclusive options",
     438             :                             "connect = false", "copy_data = true")));
     439             : 
     440             :         /* Change the defaults of other options. */
     441         130 :         opts->enabled = false;
     442         130 :         opts->create_slot = false;
     443         130 :         opts->copy_data = false;
     444             :     }
     445             : 
     446             :     /*
     447             :      * Do additional checking for disallowed combination when slot_name = NONE
     448             :      * was used.
     449             :      */
     450         886 :     if (!opts->slot_name &&
     451         858 :         IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     452             :     {
     453         116 :         if (opts->enabled)
     454             :         {
     455          18 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     456           6 :                 ereport(ERROR,
     457             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     458             :                 /*- translator: both %s are strings of the form "option = value" */
     459             :                          errmsg("%s and %s are mutually exclusive options",
     460             :                                 "slot_name = NONE", "enabled = true")));
     461             :             else
     462          12 :                 ereport(ERROR,
     463             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     464             :                 /*- translator: both %s are strings of the form "option = value" */
     465             :                          errmsg("subscription with %s must also set %s",
     466             :                                 "slot_name = NONE", "enabled = false")));
     467             :         }
     468             : 
     469          98 :         if (opts->create_slot)
     470             :         {
     471          12 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     472           6 :                 ereport(ERROR,
     473             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     474             :                 /*- translator: both %s are strings of the form "option = value" */
     475             :                          errmsg("%s and %s are mutually exclusive options",
     476             :                                 "slot_name = NONE", "create_slot = true")));
     477             :             else
     478           6 :                 ereport(ERROR,
     479             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     480             :                 /*- translator: both %s are strings of the form "option = value" */
     481             :                          errmsg("subscription with %s must also set %s",
     482             :                                 "slot_name = NONE", "create_slot = false")));
     483             :         }
     484             :     }
     485         856 : }
     486             : 
     487             : /*
     488             :  * Check that the specified publications are present on the publisher.
     489             :  */
     490             : static void
     491         256 : check_publications(WalReceiverConn *wrconn, List *publications)
     492             : {
     493             :     WalRcvExecResult *res;
     494             :     StringInfo  cmd;
     495             :     TupleTableSlot *slot;
     496         256 :     List       *publicationsCopy = NIL;
     497         256 :     Oid         tableRow[1] = {TEXTOID};
     498             : 
     499         256 :     cmd = makeStringInfo();
     500         256 :     appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
     501             :                            " pg_catalog.pg_publication t WHERE\n"
     502             :                            " t.pubname IN (");
     503         256 :     GetPublicationsStr(publications, cmd, true);
     504         256 :     appendStringInfoChar(cmd, ')');
     505             : 
     506         256 :     res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
     507         256 :     destroyStringInfo(cmd);
     508             : 
     509         256 :     if (res->status != WALRCV_OK_TUPLES)
     510           0 :         ereport(ERROR,
     511             :                 errmsg("could not receive list of publications from the publisher: %s",
     512             :                        res->err));
     513             : 
     514         256 :     publicationsCopy = list_copy(publications);
     515             : 
     516             :     /* Process publication(s). */
     517         256 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     518         564 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     519             :     {
     520             :         char       *pubname;
     521             :         bool        isnull;
     522             : 
     523         308 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     524             :         Assert(!isnull);
     525             : 
     526             :         /* Delete the publication present in publisher from the list. */
     527         308 :         publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
     528         308 :         ExecClearTuple(slot);
     529             :     }
     530             : 
     531         256 :     ExecDropSingleTupleTableSlot(slot);
     532             : 
     533         256 :     walrcv_clear_result(res);
     534             : 
     535         256 :     if (list_length(publicationsCopy))
     536             :     {
     537             :         /* Prepare the list of non-existent publication(s) for error message. */
     538           8 :         StringInfo  pubnames = makeStringInfo();
     539             : 
     540           8 :         GetPublicationsStr(publicationsCopy, pubnames, false);
     541           8 :         ereport(WARNING,
     542             :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     543             :                 errmsg_plural("publication %s does not exist on the publisher",
     544             :                               "publications %s do not exist on the publisher",
     545             :                               list_length(publicationsCopy),
     546             :                               pubnames->data));
     547             :     }
     548         256 : }
     549             : 
     550             : /*
     551             :  * Auxiliary function to build a text array out of a list of String nodes.
     552             :  */
     553             : static Datum
     554         394 : publicationListToArray(List *publist)
     555             : {
     556             :     ArrayType  *arr;
     557             :     Datum      *datums;
     558             :     MemoryContext memcxt;
     559             :     MemoryContext oldcxt;
     560             : 
     561             :     /* Create memory context for temporary allocations. */
     562         394 :     memcxt = AllocSetContextCreate(CurrentMemoryContext,
     563             :                                    "publicationListToArray to array",
     564             :                                    ALLOCSET_DEFAULT_SIZES);
     565         394 :     oldcxt = MemoryContextSwitchTo(memcxt);
     566             : 
     567         394 :     datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
     568             : 
     569         394 :     check_duplicates_in_publist(publist, datums);
     570             : 
     571         388 :     MemoryContextSwitchTo(oldcxt);
     572             : 
     573         388 :     arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
     574             : 
     575         388 :     MemoryContextDelete(memcxt);
     576             : 
     577         388 :     return PointerGetDatum(arr);
     578             : }
     579             : 
     580             : /*
     581             :  * Create new subscription.
     582             :  */
     583             : ObjectAddress
     584         482 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
     585             :                    bool isTopLevel)
     586             : {
     587             :     Relation    rel;
     588             :     ObjectAddress myself;
     589             :     Oid         subid;
     590             :     bool        nulls[Natts_pg_subscription];
     591             :     Datum       values[Natts_pg_subscription];
     592         482 :     Oid         owner = GetUserId();
     593             :     HeapTuple   tup;
     594             :     char       *conninfo;
     595             :     char        originname[NAMEDATALEN];
     596             :     List       *publications;
     597             :     bits32      supported_opts;
     598         482 :     SubOpts     opts = {0};
     599             :     AclResult   aclresult;
     600             : 
     601             :     /*
     602             :      * Parse and check options.
     603             :      *
     604             :      * Connection and publication should not be specified here.
     605             :      */
     606         482 :     supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     607             :                       SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
     608             :                       SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
     609             :                       SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
     610             :                       SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
     611             :                       SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
     612             :                       SUBOPT_RETAIN_DEAD_TUPLES |
     613             :                       SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
     614         482 :     parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
     615             : 
     616             :     /*
     617             :      * Since creating a replication slot is not transactional, rolling back
     618             :      * the transaction leaves the created replication slot.  So we cannot run
     619             :      * CREATE SUBSCRIPTION inside a transaction block if creating a
     620             :      * replication slot.
     621             :      */
     622         392 :     if (opts.create_slot)
     623         252 :         PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
     624             : 
     625             :     /*
     626             :      * We don't want to allow unprivileged users to be able to trigger
     627             :      * attempts to access arbitrary network destinations, so require the user
     628             :      * to have been specifically authorized to create subscriptions.
     629             :      */
     630         386 :     if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
     631           6 :         ereport(ERROR,
     632             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     633             :                  errmsg("permission denied to create subscription"),
     634             :                  errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
     635             :                            "pg_create_subscription")));
     636             : 
     637             :     /*
     638             :      * Since a subscription is a database object, we also check for CREATE
     639             :      * permission on the database.
     640             :      */
     641         380 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
     642             :                                 owner, ACL_CREATE);
     643         380 :     if (aclresult != ACLCHECK_OK)
     644          12 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     645           6 :                        get_database_name(MyDatabaseId));
     646             : 
     647             :     /*
     648             :      * Non-superusers are required to set a password for authentication, and
     649             :      * that password must be used by the target server, but the superuser can
     650             :      * exempt a subscription from this requirement.
     651             :      */
     652         374 :     if (!opts.passwordrequired && !superuser_arg(owner))
     653           6 :         ereport(ERROR,
     654             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     655             :                  errmsg("password_required=false is superuser-only"),
     656             :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
     657             : 
     658             :     /*
     659             :      * If built with appropriate switch, whine when regression-testing
     660             :      * conventions for subscription names are violated.
     661             :      */
     662             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
     663             :     if (strncmp(stmt->subname, "regress_", 8) != 0)
     664             :         elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
     665             : #endif
     666             : 
     667         368 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     668             : 
     669             :     /* Check if name is used */
     670         368 :     subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
     671             :                             ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
     672         368 :     if (OidIsValid(subid))
     673             :     {
     674           6 :         ereport(ERROR,
     675             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     676             :                  errmsg("subscription \"%s\" already exists",
     677             :                         stmt->subname)));
     678             :     }
     679             : 
     680             :     /*
     681             :      * Ensure that system configuration paramters are set appropriately to
     682             :      * support retain_dead_tuples and max_retention_duration.
     683             :      */
     684         362 :     CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
     685         362 :                                opts.retaindeadtuples, opts.retaindeadtuples,
     686         362 :                                (opts.maxretention > 0));
     687             : 
     688         362 :     if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
     689         320 :         opts.slot_name == NULL)
     690         320 :         opts.slot_name = stmt->subname;
     691             : 
     692             :     /* The default for synchronous_commit of subscriptions is off. */
     693         362 :     if (opts.synchronous_commit == NULL)
     694         362 :         opts.synchronous_commit = "off";
     695             : 
     696         362 :     conninfo = stmt->conninfo;
     697         362 :     publications = stmt->publication;
     698             : 
     699             :     /* Load the library providing us libpq calls. */
     700         362 :     load_file("libpqwalreceiver", false);
     701             : 
     702             :     /* Check the connection info string. */
     703         362 :     walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
     704             : 
     705             :     /* Everything ok, form a new tuple. */
     706         344 :     memset(values, 0, sizeof(values));
     707         344 :     memset(nulls, false, sizeof(nulls));
     708             : 
     709         344 :     subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
     710             :                                Anum_pg_subscription_oid);
     711         344 :     values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
     712         344 :     values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
     713         344 :     values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
     714         344 :     values[Anum_pg_subscription_subname - 1] =
     715         344 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
     716         344 :     values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
     717         344 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
     718         344 :     values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
     719         344 :     values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
     720         344 :     values[Anum_pg_subscription_subtwophasestate - 1] =
     721         344 :         CharGetDatum(opts.twophase ?
     722             :                      LOGICALREP_TWOPHASE_STATE_PENDING :
     723             :                      LOGICALREP_TWOPHASE_STATE_DISABLED);
     724         344 :     values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
     725         344 :     values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
     726         344 :     values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
     727         344 :     values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
     728         344 :     values[Anum_pg_subscription_subretaindeadtuples - 1] =
     729         344 :         BoolGetDatum(opts.retaindeadtuples);
     730         344 :     values[Anum_pg_subscription_submaxretention - 1] =
     731         344 :         Int32GetDatum(opts.maxretention);
     732         344 :     values[Anum_pg_subscription_subretentionactive - 1] =
     733         344 :         Int32GetDatum(opts.retaindeadtuples);
     734         344 :     values[Anum_pg_subscription_subconninfo - 1] =
     735         344 :         CStringGetTextDatum(conninfo);
     736         344 :     if (opts.slot_name)
     737         324 :         values[Anum_pg_subscription_subslotname - 1] =
     738         324 :             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
     739             :     else
     740          20 :         nulls[Anum_pg_subscription_subslotname - 1] = true;
     741         344 :     values[Anum_pg_subscription_subsynccommit - 1] =
     742         344 :         CStringGetTextDatum(opts.synchronous_commit);
     743         338 :     values[Anum_pg_subscription_subpublications - 1] =
     744         344 :         publicationListToArray(publications);
     745         338 :     values[Anum_pg_subscription_suborigin - 1] =
     746         338 :         CStringGetTextDatum(opts.origin);
     747             : 
     748         338 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     749             : 
     750             :     /* Insert tuple into catalog. */
     751         338 :     CatalogTupleInsert(rel, tup);
     752         338 :     heap_freetuple(tup);
     753             : 
     754         338 :     recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
     755             : 
     756             :     /*
     757             :      * A replication origin is currently created for all subscriptions,
     758             :      * including those that only contain sequences or are otherwise empty.
     759             :      *
     760             :      * XXX: While this is technically unnecessary, optimizing it would require
     761             :      * additional logic to skip origin creation during DDL operations and
     762             :      * apply workers initialization, and to handle origin creation dynamically
     763             :      * when tables are added to the subscription. It is not clear whether
     764             :      * preventing creation of origins is worth additional complexity.
     765             :      */
     766         338 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
     767         338 :     replorigin_create(originname);
     768             : 
     769             :     /*
     770             :      * Connect to remote side to execute requested commands and fetch table
     771             :      * and sequence info.
     772             :      */
     773         338 :     if (opts.connect)
     774             :     {
     775             :         char       *err;
     776             :         WalReceiverConn *wrconn;
     777             :         bool        must_use_password;
     778             : 
     779             :         /* Try to connect to the publisher. */
     780         244 :         must_use_password = !superuser_arg(owner) && opts.passwordrequired;
     781         244 :         wrconn = walrcv_connect(conninfo, true, true, must_use_password,
     782             :                                 stmt->subname, &err);
     783         244 :         if (!wrconn)
     784           6 :             ereport(ERROR,
     785             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     786             :                      errmsg("subscription \"%s\" could not connect to the publisher: %s",
     787             :                             stmt->subname, err)));
     788             : 
     789         238 :         PG_TRY();
     790             :         {
     791         238 :             bool        has_tables = false;
     792             :             List       *pubrels;
     793             :             char        relation_state;
     794             : 
     795         238 :             check_publications(wrconn, publications);
     796         238 :             check_publications_origin_tables(wrconn, publications,
     797         238 :                                              opts.copy_data,
     798         238 :                                              opts.retaindeadtuples, opts.origin,
     799             :                                              NULL, 0, stmt->subname);
     800         238 :             check_publications_origin_sequences(wrconn, publications,
     801         238 :                                                 opts.copy_data, opts.origin,
     802             :                                                 NULL, 0, stmt->subname);
     803             : 
     804         238 :             if (opts.retaindeadtuples)
     805           6 :                 check_pub_dead_tuple_retention(wrconn);
     806             : 
     807             :             /*
     808             :              * Set sync state based on if we were asked to do data copy or
     809             :              * not.
     810             :              */
     811         238 :             relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
     812             : 
     813             :             /*
     814             :              * Build local relation status info. Relations are for both tables
     815             :              * and sequences from the publisher.
     816             :              */
     817         238 :             pubrels = fetch_relation_list(wrconn, publications);
     818             : 
     819         834 :             foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
     820             :             {
     821             :                 Oid         relid;
     822             :                 char        relkind;
     823         362 :                 RangeVar   *rv = pubrelinfo->rv;
     824             : 
     825         362 :                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
     826         362 :                 relkind = get_rel_relkind(relid);
     827             : 
     828             :                 /* Check for supported relkind. */
     829         362 :                 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
     830         362 :                                          rv->schemaname, rv->relname);
     831         362 :                 has_tables |= (relkind != RELKIND_SEQUENCE);
     832         362 :                 AddSubscriptionRelState(subid, relid, relation_state,
     833             :                                         InvalidXLogRecPtr, true);
     834             :             }
     835             : 
     836             :             /*
     837             :              * If requested, create permanent slot for the subscription. We
     838             :              * won't use the initial snapshot for anything, so no need to
     839             :              * export it.
     840             :              *
     841             :              * XXX: Similar to origins, it is not clear whether preventing the
     842             :              * slot creation for empty and sequence-only subscriptions is
     843             :              * worth additional complexity.
     844             :              */
     845         236 :             if (opts.create_slot)
     846             :             {
     847         226 :                 bool        twophase_enabled = false;
     848             : 
     849             :                 Assert(opts.slot_name);
     850             : 
     851             :                 /*
     852             :                  * Even if two_phase is set, don't create the slot with
     853             :                  * two-phase enabled. Will enable it once all the tables are
     854             :                  * synced and ready. This avoids race-conditions like prepared
     855             :                  * transactions being skipped due to changes not being applied
     856             :                  * due to checks in should_apply_changes_for_rel() when
     857             :                  * tablesync for the corresponding tables are in progress. See
     858             :                  * comments atop worker.c.
     859             :                  *
     860             :                  * Note that if tables were specified but copy_data is false
     861             :                  * then it is safe to enable two_phase up-front because those
     862             :                  * tables are already initially in READY state. When the
     863             :                  * subscription has no tables, we leave the twophase state as
     864             :                  * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
     865             :                  * PUBLICATION to work.
     866             :                  */
     867         226 :                 if (opts.twophase && !opts.copy_data && has_tables)
     868           2 :                     twophase_enabled = true;
     869             : 
     870         226 :                 walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
     871             :                                    opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
     872             : 
     873         226 :                 if (twophase_enabled)
     874           2 :                     UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
     875             : 
     876         226 :                 ereport(NOTICE,
     877             :                         (errmsg("created replication slot \"%s\" on publisher",
     878             :                                 opts.slot_name)));
     879             :             }
     880             :         }
     881           2 :         PG_FINALLY();
     882             :         {
     883         238 :             walrcv_disconnect(wrconn);
     884             :         }
     885         238 :         PG_END_TRY();
     886             :     }
     887             :     else
     888          94 :         ereport(WARNING,
     889             :                 (errmsg("subscription was created, but is not connected"),
     890             :                  errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
     891             : 
     892         330 :     table_close(rel, RowExclusiveLock);
     893             : 
     894         330 :     pgstat_create_subscription(subid);
     895             : 
     896             :     /*
     897             :      * Notify the launcher to start the apply worker if the subscription is
     898             :      * enabled, or to create the conflict detection slot if retain_dead_tuples
     899             :      * is enabled.
     900             :      *
     901             :      * Creating the conflict detection slot is essential even when the
     902             :      * subscription is not enabled. This ensures that dead tuples are
     903             :      * retained, which is necessary for accurately identifying the type of
     904             :      * conflict during replication.
     905             :      */
     906         330 :     if (opts.enabled || opts.retaindeadtuples)
     907         226 :         ApplyLauncherWakeupAtCommit();
     908             : 
     909         330 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     910             : 
     911         330 :     InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
     912             : 
     913         330 :     return myself;
     914             : }
     915             : 
     916             : static void
     917          70 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
     918             :                           List *validate_publications)
     919             : {
     920             :     char       *err;
     921          70 :     List       *pubrels = NIL;
     922             :     Oid        *pubrel_local_oids;
     923             :     List       *subrel_states;
     924          70 :     List       *sub_remove_rels = NIL;
     925             :     Oid        *subrel_local_oids;
     926             :     Oid        *subseq_local_oids;
     927             :     int         subrel_count;
     928             :     ListCell   *lc;
     929             :     int         off;
     930          70 :     int         tbl_count = 0;
     931          70 :     int         seq_count = 0;
     932          70 :     Relation    rel = NULL;
     933             :     typedef struct SubRemoveRels
     934             :     {
     935             :         Oid         relid;
     936             :         char        state;
     937             :     } SubRemoveRels;
     938             : 
     939             :     WalReceiverConn *wrconn;
     940             :     bool        must_use_password;
     941             : 
     942             :     /* Load the library providing us libpq calls. */
     943          70 :     load_file("libpqwalreceiver", false);
     944             : 
     945             :     /* Try to connect to the publisher. */
     946          70 :     must_use_password = sub->passwordrequired && !sub->ownersuperuser;
     947          70 :     wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
     948             :                             sub->name, &err);
     949          68 :     if (!wrconn)
     950           0 :         ereport(ERROR,
     951             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     952             :                  errmsg("subscription \"%s\" could not connect to the publisher: %s",
     953             :                         sub->name, err)));
     954             : 
     955          68 :     PG_TRY();
     956             :     {
     957          68 :         if (validate_publications)
     958          18 :             check_publications(wrconn, validate_publications);
     959             : 
     960             :         /* Get the relation list from publisher. */
     961          68 :         pubrels = fetch_relation_list(wrconn, sub->publications);
     962             : 
     963             :         /* Get local relation list. */
     964          68 :         subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
     965          68 :         subrel_count = list_length(subrel_states);
     966             : 
     967             :         /*
     968             :          * Build qsorted arrays of local table oids and sequence oids for
     969             :          * faster lookup. This can potentially contain all tables and
     970             :          * sequences in the database so speed of lookup is important.
     971             :          *
     972             :          * We do not yet know the exact count of tables and sequences, so we
     973             :          * allocate separate arrays for table OIDs and sequence OIDs based on
     974             :          * the total number of relations (subrel_count).
     975             :          */
     976          68 :         subrel_local_oids = palloc(subrel_count * sizeof(Oid));
     977          68 :         subseq_local_oids = palloc(subrel_count * sizeof(Oid));
     978         244 :         foreach(lc, subrel_states)
     979             :         {
     980         176 :             SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
     981             : 
     982         176 :             if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
     983           0 :                 subseq_local_oids[seq_count++] = relstate->relid;
     984             :             else
     985         176 :                 subrel_local_oids[tbl_count++] = relstate->relid;
     986             :         }
     987             : 
     988          68 :         qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
     989          68 :         check_publications_origin_tables(wrconn, sub->publications, copy_data,
     990          68 :                                          sub->retaindeadtuples, sub->origin,
     991             :                                          subrel_local_oids, tbl_count,
     992             :                                          sub->name);
     993             : 
     994          68 :         qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
     995          68 :         check_publications_origin_sequences(wrconn, sub->publications,
     996             :                                             copy_data, sub->origin,
     997             :                                             subseq_local_oids, seq_count,
     998             :                                             sub->name);
     999             : 
    1000             :         /*
    1001             :          * Walk over the remote relations and try to match them to locally
    1002             :          * known relations. If the relation is not known locally create a new
    1003             :          * state for it.
    1004             :          *
    1005             :          * Also builds array of local oids of remote relations for the next
    1006             :          * step.
    1007             :          */
    1008          68 :         off = 0;
    1009          68 :         pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
    1010             : 
    1011         316 :         foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
    1012             :         {
    1013         180 :             RangeVar   *rv = pubrelinfo->rv;
    1014             :             Oid         relid;
    1015             :             char        relkind;
    1016             : 
    1017         180 :             relid = RangeVarGetRelid(rv, AccessShareLock, false);
    1018         180 :             relkind = get_rel_relkind(relid);
    1019             : 
    1020             :             /* Check for supported relkind. */
    1021         180 :             CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
    1022         180 :                                      rv->schemaname, rv->relname);
    1023             : 
    1024         180 :             pubrel_local_oids[off++] = relid;
    1025             : 
    1026         180 :             if (!bsearch(&relid, subrel_local_oids,
    1027          46 :                          tbl_count, sizeof(Oid), oid_cmp) &&
    1028          46 :                 !bsearch(&relid, subseq_local_oids,
    1029             :                          seq_count, sizeof(Oid), oid_cmp))
    1030             :             {
    1031          46 :                 AddSubscriptionRelState(sub->oid, relid,
    1032             :                                         copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
    1033             :                                         InvalidXLogRecPtr, true);
    1034          46 :                 ereport(DEBUG1,
    1035             :                         errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
    1036             :                                         relkind == RELKIND_SEQUENCE ? "sequence" : "table",
    1037             :                                         rv->schemaname, rv->relname, sub->name));
    1038             :             }
    1039             :         }
    1040             : 
    1041             :         /*
    1042             :          * Next remove state for tables we should not care about anymore using
    1043             :          * the data we collected above
    1044             :          */
    1045          68 :         qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
    1046             : 
    1047         244 :         for (off = 0; off < tbl_count; off++)
    1048             :         {
    1049         176 :             Oid         relid = subrel_local_oids[off];
    1050             : 
    1051         176 :             if (!bsearch(&relid, pubrel_local_oids,
    1052         176 :                          list_length(pubrels), sizeof(Oid), oid_cmp))
    1053             :             {
    1054             :                 char        state;
    1055             :                 XLogRecPtr  statelsn;
    1056          42 :                 SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels));
    1057             : 
    1058             :                 /*
    1059             :                  * Lock pg_subscription_rel with AccessExclusiveLock to
    1060             :                  * prevent any race conditions with the apply worker
    1061             :                  * re-launching workers at the same time this code is trying
    1062             :                  * to remove those tables.
    1063             :                  *
    1064             :                  * Even if new worker for this particular rel is restarted it
    1065             :                  * won't be able to make any progress as we hold exclusive
    1066             :                  * lock on pg_subscription_rel till the transaction end. It
    1067             :                  * will simply exit as there is no corresponding rel entry.
    1068             :                  *
    1069             :                  * This locking also ensures that the state of rels won't
    1070             :                  * change till we are done with this refresh operation.
    1071             :                  */
    1072          42 :                 if (!rel)
    1073          18 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1074             : 
    1075             :                 /* Last known rel state. */
    1076          42 :                 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
    1077             : 
    1078          42 :                 RemoveSubscriptionRel(sub->oid, relid);
    1079             : 
    1080          42 :                 remove_rel->relid = relid;
    1081          42 :                 remove_rel->state = state;
    1082             : 
    1083          42 :                 sub_remove_rels = lappend(sub_remove_rels, remove_rel);
    1084             : 
    1085          42 :                 logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
    1086             : 
    1087             :                 /*
    1088             :                  * For READY state, we would have already dropped the
    1089             :                  * tablesync origin.
    1090             :                  */
    1091          42 :                 if (state != SUBREL_STATE_READY)
    1092             :                 {
    1093             :                     char        originname[NAMEDATALEN];
    1094             : 
    1095             :                     /*
    1096             :                      * Drop the tablesync's origin tracking if exists.
    1097             :                      *
    1098             :                      * It is possible that the origin is not yet created for
    1099             :                      * tablesync worker, this can happen for the states before
    1100             :                      * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
    1101             :                      * apply worker can also concurrently try to drop the
    1102             :                      * origin and by this time the origin might be already
    1103             :                      * removed. For these reasons, passing missing_ok = true.
    1104             :                      */
    1105           0 :                     ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
    1106             :                                                        sizeof(originname));
    1107           0 :                     replorigin_drop_by_name(originname, true, false);
    1108             :                 }
    1109             : 
    1110          42 :                 ereport(DEBUG1,
    1111             :                         (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
    1112             :                                          get_namespace_name(get_rel_namespace(relid)),
    1113             :                                          get_rel_name(relid),
    1114             :                                          sub->name)));
    1115             :             }
    1116             :         }
    1117             : 
    1118             :         /*
    1119             :          * Drop the tablesync slots associated with removed tables. This has
    1120             :          * to be at the end because otherwise if there is an error while doing
    1121             :          * the database operations we won't be able to rollback dropped slots.
    1122             :          */
    1123         178 :         foreach_ptr(SubRemoveRels, rel, sub_remove_rels)
    1124             :         {
    1125          42 :             if (rel->state != SUBREL_STATE_READY &&
    1126           0 :                 rel->state != SUBREL_STATE_SYNCDONE)
    1127             :             {
    1128           0 :                 char        syncslotname[NAMEDATALEN] = {0};
    1129             : 
    1130             :                 /*
    1131             :                  * For READY/SYNCDONE states we know the tablesync slot has
    1132             :                  * already been dropped by the tablesync worker.
    1133             :                  *
    1134             :                  * For other states, there is no certainty, maybe the slot
    1135             :                  * does not exist yet. Also, if we fail after removing some of
    1136             :                  * the slots, next time, it will again try to drop already
    1137             :                  * dropped slots and fail. For these reasons, we allow
    1138             :                  * missing_ok = true for the drop.
    1139             :                  */
    1140           0 :                 ReplicationSlotNameForTablesync(sub->oid, rel->relid,
    1141             :                                                 syncslotname, sizeof(syncslotname));
    1142           0 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    1143             :             }
    1144             :         }
    1145             : 
    1146             :         /*
    1147             :          * Next remove state for sequences we should not care about anymore
    1148             :          * using the data we collected above
    1149             :          */
    1150          68 :         for (off = 0; off < seq_count; off++)
    1151             :         {
    1152           0 :             Oid         relid = subseq_local_oids[off];
    1153             : 
    1154           0 :             if (!bsearch(&relid, pubrel_local_oids,
    1155           0 :                          list_length(pubrels), sizeof(Oid), oid_cmp))
    1156             :             {
    1157             :                 /*
    1158             :                  * This locking ensures that the state of rels won't change
    1159             :                  * till we are done with this refresh operation.
    1160             :                  */
    1161           0 :                 if (!rel)
    1162           0 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1163             : 
    1164           0 :                 RemoveSubscriptionRel(sub->oid, relid);
    1165             : 
    1166           0 :                 ereport(DEBUG1,
    1167             :                         errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
    1168             :                                         get_namespace_name(get_rel_namespace(relid)),
    1169             :                                         get_rel_name(relid),
    1170             :                                         sub->name));
    1171             :             }
    1172             :         }
    1173             :     }
    1174           0 :     PG_FINALLY();
    1175             :     {
    1176          68 :         walrcv_disconnect(wrconn);
    1177             :     }
    1178          68 :     PG_END_TRY();
    1179             : 
    1180          68 :     if (rel)
    1181          18 :         table_close(rel, NoLock);
    1182          68 : }
    1183             : 
    1184             : /*
    1185             :  * Marks all sequences with INIT state.
    1186             :  */
    1187             : static void
    1188           0 : AlterSubscription_refresh_seq(Subscription *sub)
    1189             : {
    1190           0 :     char       *err = NULL;
    1191             :     WalReceiverConn *wrconn;
    1192             :     bool        must_use_password;
    1193             : 
    1194             :     /* Load the library providing us libpq calls. */
    1195           0 :     load_file("libpqwalreceiver", false);
    1196             : 
    1197             :     /* Try to connect to the publisher. */
    1198           0 :     must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1199           0 :     wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
    1200             :                             sub->name, &err);
    1201           0 :     if (!wrconn)
    1202           0 :         ereport(ERROR,
    1203             :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1204             :                 errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1205             :                        sub->name, err));
    1206             : 
    1207           0 :     PG_TRY();
    1208             :     {
    1209             :         List       *subrel_states;
    1210             : 
    1211           0 :         check_publications_origin_sequences(wrconn, sub->publications, true,
    1212             :                                             sub->origin, NULL, 0, sub->name);
    1213             : 
    1214             :         /* Get local sequence list. */
    1215           0 :         subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
    1216           0 :         foreach_ptr(SubscriptionRelState, subrel, subrel_states)
    1217             :         {
    1218           0 :             Oid         relid = subrel->relid;
    1219             : 
    1220           0 :             UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
    1221             :                                        InvalidXLogRecPtr, false);
    1222           0 :             ereport(DEBUG1,
    1223             :                     errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
    1224             :                                     get_namespace_name(get_rel_namespace(relid)),
    1225             :                                     get_rel_name(relid),
    1226             :                                     sub->name));
    1227             :         }
    1228             :     }
    1229           0 :     PG_FINALLY();
    1230             :     {
    1231           0 :         walrcv_disconnect(wrconn);
    1232             :     }
    1233           0 :     PG_END_TRY();
    1234           0 : }
    1235             : 
    1236             : /*
    1237             :  * Common checks for altering failover, two_phase, and retain_dead_tuples
    1238             :  * options.
    1239             :  */
    1240             : static void
    1241          26 : CheckAlterSubOption(Subscription *sub, const char *option,
    1242             :                     bool slot_needs_update, bool isTopLevel)
    1243             : {
    1244             :     Assert(strcmp(option, "failover") == 0 ||
    1245             :            strcmp(option, "two_phase") == 0 ||
    1246             :            strcmp(option, "retain_dead_tuples") == 0);
    1247             : 
    1248             :     /*
    1249             :      * Altering the retain_dead_tuples option does not update the slot on the
    1250             :      * publisher.
    1251             :      */
    1252             :     Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
    1253             : 
    1254             :     /*
    1255             :      * Do not allow changing the option if the subscription is enabled. This
    1256             :      * is because both failover and two_phase options of the slot on the
    1257             :      * publisher cannot be modified if the slot is currently acquired by the
    1258             :      * existing walsender.
    1259             :      *
    1260             :      * Note that two_phase is enabled (aka changed from 'false' to 'true') on
    1261             :      * the publisher by the existing walsender, so we could have allowed that
    1262             :      * even when the subscription is enabled. But we kept this restriction for
    1263             :      * the sake of consistency and simplicity.
    1264             :      *
    1265             :      * Additionally, do not allow changing the retain_dead_tuples option when
    1266             :      * the subscription is enabled to prevent race conditions arising from the
    1267             :      * new option value being acknowledged asynchronously by the launcher and
    1268             :      * apply workers.
    1269             :      *
    1270             :      * Without the restriction, a race condition may arise when a user
    1271             :      * disables and immediately re-enables the retain_dead_tuples option. In
    1272             :      * this case, the launcher might drop the slot upon noticing the disabled
    1273             :      * action, while the apply worker may keep maintaining
    1274             :      * oldest_nonremovable_xid without noticing the option change. During this
    1275             :      * period, a transaction ID wraparound could falsely make this ID appear
    1276             :      * as if it originates from the future w.r.t the transaction ID stored in
    1277             :      * the slot maintained by launcher.
    1278             :      *
    1279             :      * Similarly, if the user enables retain_dead_tuples concurrently with the
    1280             :      * launcher starting the worker, the apply worker may start calculating
    1281             :      * oldest_nonremovable_xid before the launcher notices the enable action.
    1282             :      * Consequently, the launcher may update slot.xmin to a newer value than
    1283             :      * that maintained by the worker. In subsequent cycles, upon integrating
    1284             :      * the worker's oldest_nonremovable_xid, the launcher might detect a
    1285             :      * retreat in the calculated xmin, necessitating additional handling.
    1286             :      *
    1287             :      * XXX To address the above race conditions, we can define
    1288             :      * oldest_nonremovable_xid as FullTransactionID and adds the check to
    1289             :      * disallow retreating the conflict slot's xmin. For now, we kept the
    1290             :      * implementation simple by disallowing change to the retain_dead_tuples,
    1291             :      * but in the future we can change this after some more analysis.
    1292             :      *
    1293             :      * Note that we could restrict only the enabling of retain_dead_tuples to
    1294             :      * avoid the race conditions described above, but we maintain the
    1295             :      * restriction for both enable and disable operations for the sake of
    1296             :      * consistency.
    1297             :      */
    1298          26 :     if (sub->enabled)
    1299           4 :         ereport(ERROR,
    1300             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1301             :                  errmsg("cannot set option \"%s\" for enabled subscription",
    1302             :                         option)));
    1303             : 
    1304          22 :     if (slot_needs_update)
    1305             :     {
    1306             :         StringInfoData cmd;
    1307             : 
    1308             :         /*
    1309             :          * A valid slot must be associated with the subscription for us to
    1310             :          * modify any of the slot's properties.
    1311             :          */
    1312          16 :         if (!sub->slotname)
    1313           0 :             ereport(ERROR,
    1314             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1315             :                      errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
    1316             :                             option)));
    1317             : 
    1318             :         /* The changed option of the slot can't be rolled back. */
    1319          16 :         initStringInfo(&cmd);
    1320          16 :         appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
    1321             : 
    1322          16 :         PreventInTransactionBlock(isTopLevel, cmd.data);
    1323          10 :         pfree(cmd.data);
    1324             :     }
    1325          16 : }
    1326             : 
    1327             : /*
    1328             :  * Alter the existing subscription.
    1329             :  */
    1330             : ObjectAddress
    1331         520 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
    1332             :                   bool isTopLevel)
    1333             : {
    1334             :     Relation    rel;
    1335             :     ObjectAddress myself;
    1336             :     bool        nulls[Natts_pg_subscription];
    1337             :     bool        replaces[Natts_pg_subscription];
    1338             :     Datum       values[Natts_pg_subscription];
    1339             :     HeapTuple   tup;
    1340             :     Oid         subid;
    1341         520 :     bool        update_tuple = false;
    1342         520 :     bool        update_failover = false;
    1343         520 :     bool        update_two_phase = false;
    1344         520 :     bool        check_pub_rdt = false;
    1345             :     bool        retain_dead_tuples;
    1346             :     int         max_retention;
    1347             :     bool        retention_active;
    1348             :     char       *origin;
    1349             :     Subscription *sub;
    1350             :     Form_pg_subscription form;
    1351             :     bits32      supported_opts;
    1352         520 :     SubOpts     opts = {0};
    1353             : 
    1354         520 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1355             : 
    1356             :     /* Fetch the existing tuple. */
    1357         520 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    1358             :                               CStringGetDatum(stmt->subname));
    1359             : 
    1360         520 :     if (!HeapTupleIsValid(tup))
    1361           6 :         ereport(ERROR,
    1362             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1363             :                  errmsg("subscription \"%s\" does not exist",
    1364             :                         stmt->subname)));
    1365             : 
    1366         514 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1367         514 :     subid = form->oid;
    1368             : 
    1369             :     /* must be owner */
    1370         514 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    1371           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    1372           0 :                        stmt->subname);
    1373             : 
    1374         514 :     sub = GetSubscription(subid, false);
    1375             : 
    1376         514 :     retain_dead_tuples = sub->retaindeadtuples;
    1377         514 :     origin = sub->origin;
    1378         514 :     max_retention = sub->maxretention;
    1379         514 :     retention_active = sub->retentionactive;
    1380             : 
    1381             :     /*
    1382             :      * Don't allow non-superuser modification of a subscription with
    1383             :      * password_required=false.
    1384             :      */
    1385         514 :     if (!sub->passwordrequired && !superuser())
    1386           0 :         ereport(ERROR,
    1387             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1388             :                  errmsg("password_required=false is superuser-only"),
    1389             :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1390             : 
    1391             :     /* Lock the subscription so nobody else can do anything with it. */
    1392         514 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    1393             : 
    1394             :     /* Form a new tuple. */
    1395         514 :     memset(values, 0, sizeof(values));
    1396         514 :     memset(nulls, false, sizeof(nulls));
    1397         514 :     memset(replaces, false, sizeof(replaces));
    1398             : 
    1399         514 :     switch (stmt->kind)
    1400             :     {
    1401         218 :         case ALTER_SUBSCRIPTION_OPTIONS:
    1402             :             {
    1403         218 :                 supported_opts = (SUBOPT_SLOT_NAME |
    1404             :                                   SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
    1405             :                                   SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
    1406             :                                   SUBOPT_DISABLE_ON_ERR |
    1407             :                                   SUBOPT_PASSWORD_REQUIRED |
    1408             :                                   SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
    1409             :                                   SUBOPT_RETAIN_DEAD_TUPLES |
    1410             :                                   SUBOPT_MAX_RETENTION_DURATION |
    1411             :                                   SUBOPT_ORIGIN);
    1412             : 
    1413         218 :                 parse_subscription_options(pstate, stmt->options,
    1414             :                                            supported_opts, &opts);
    1415             : 
    1416         200 :                 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1417             :                 {
    1418             :                     /*
    1419             :                      * The subscription must be disabled to allow slot_name as
    1420             :                      * 'none', otherwise, the apply worker will repeatedly try
    1421             :                      * to stream the data using that slot_name which neither
    1422             :                      * exists on the publisher nor the user will be allowed to
    1423             :                      * create it.
    1424             :                      */
    1425          72 :                     if (sub->enabled && !opts.slot_name)
    1426           0 :                         ereport(ERROR,
    1427             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1428             :                                  errmsg("cannot set %s for enabled subscription",
    1429             :                                         "slot_name = NONE")));
    1430             : 
    1431          72 :                     if (opts.slot_name)
    1432           6 :                         values[Anum_pg_subscription_subslotname - 1] =
    1433           6 :                             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
    1434             :                     else
    1435          66 :                         nulls[Anum_pg_subscription_subslotname - 1] = true;
    1436          72 :                     replaces[Anum_pg_subscription_subslotname - 1] = true;
    1437             :                 }
    1438             : 
    1439         200 :                 if (opts.synchronous_commit)
    1440             :                 {
    1441           6 :                     values[Anum_pg_subscription_subsynccommit - 1] =
    1442           6 :                         CStringGetTextDatum(opts.synchronous_commit);
    1443           6 :                     replaces[Anum_pg_subscription_subsynccommit - 1] = true;
    1444             :                 }
    1445             : 
    1446         200 :                 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
    1447             :                 {
    1448          18 :                     values[Anum_pg_subscription_subbinary - 1] =
    1449          18 :                         BoolGetDatum(opts.binary);
    1450          18 :                     replaces[Anum_pg_subscription_subbinary - 1] = true;
    1451             :                 }
    1452             : 
    1453         200 :                 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
    1454             :                 {
    1455          30 :                     values[Anum_pg_subscription_substream - 1] =
    1456          30 :                         CharGetDatum(opts.streaming);
    1457          30 :                     replaces[Anum_pg_subscription_substream - 1] = true;
    1458             :                 }
    1459             : 
    1460         200 :                 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
    1461             :                 {
    1462             :                     values[Anum_pg_subscription_subdisableonerr - 1]
    1463           6 :                         = BoolGetDatum(opts.disableonerr);
    1464             :                     replaces[Anum_pg_subscription_subdisableonerr - 1]
    1465           6 :                         = true;
    1466             :                 }
    1467             : 
    1468         200 :                 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
    1469             :                 {
    1470             :                     /* Non-superuser may not disable password_required. */
    1471          12 :                     if (!opts.passwordrequired && !superuser())
    1472           0 :                         ereport(ERROR,
    1473             :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1474             :                                  errmsg("password_required=false is superuser-only"),
    1475             :                                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1476             : 
    1477             :                     values[Anum_pg_subscription_subpasswordrequired - 1]
    1478          12 :                         = BoolGetDatum(opts.passwordrequired);
    1479             :                     replaces[Anum_pg_subscription_subpasswordrequired - 1]
    1480          12 :                         = true;
    1481             :                 }
    1482             : 
    1483         200 :                 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
    1484             :                 {
    1485          14 :                     values[Anum_pg_subscription_subrunasowner - 1] =
    1486          14 :                         BoolGetDatum(opts.runasowner);
    1487          14 :                     replaces[Anum_pg_subscription_subrunasowner - 1] = true;
    1488             :                 }
    1489             : 
    1490         200 :                 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
    1491             :                 {
    1492             :                     /*
    1493             :                      * We need to update both the slot and the subscription
    1494             :                      * for the two_phase option. We can enable the two_phase
    1495             :                      * option for a slot only once the initial data
    1496             :                      * synchronization is done. This is to avoid missing some
    1497             :                      * data as explained in comments atop worker.c.
    1498             :                      */
    1499           6 :                     update_two_phase = !opts.twophase;
    1500             : 
    1501           6 :                     CheckAlterSubOption(sub, "two_phase", update_two_phase,
    1502             :                                         isTopLevel);
    1503             : 
    1504             :                     /*
    1505             :                      * Modifying the two_phase slot option requires a slot
    1506             :                      * lookup by slot name, so changing the slot name at the
    1507             :                      * same time is not allowed.
    1508             :                      */
    1509           6 :                     if (update_two_phase &&
    1510           2 :                         IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1511           0 :                         ereport(ERROR,
    1512             :                                 (errcode(ERRCODE_SYNTAX_ERROR),
    1513             :                                  errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
    1514             : 
    1515             :                     /*
    1516             :                      * Note that workers may still survive even if the
    1517             :                      * subscription has been disabled.
    1518             :                      *
    1519             :                      * Ensure workers have already been exited to avoid
    1520             :                      * getting prepared transactions while we are disabling
    1521             :                      * the two_phase option. Otherwise, the changes of an
    1522             :                      * already prepared transaction can be replicated again
    1523             :                      * along with its corresponding commit, leading to
    1524             :                      * duplicate data or errors.
    1525             :                      */
    1526           6 :                     if (logicalrep_workers_find(subid, true, true))
    1527           0 :                         ereport(ERROR,
    1528             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1529             :                                  errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
    1530             :                                  errhint("Try again after some time.")));
    1531             : 
    1532             :                     /*
    1533             :                      * two_phase cannot be disabled if there are any
    1534             :                      * uncommitted prepared transactions present otherwise it
    1535             :                      * can lead to duplicate data or errors as explained in
    1536             :                      * the comment above.
    1537             :                      */
    1538           6 :                     if (update_two_phase &&
    1539           2 :                         sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
    1540           2 :                         LookupGXactBySubid(subid))
    1541           0 :                         ereport(ERROR,
    1542             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1543             :                                  errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
    1544             :                                  errhint("Resolve these transactions and try again.")));
    1545             : 
    1546             :                     /* Change system catalog accordingly */
    1547           6 :                     values[Anum_pg_subscription_subtwophasestate - 1] =
    1548           6 :                         CharGetDatum(opts.twophase ?
    1549             :                                      LOGICALREP_TWOPHASE_STATE_PENDING :
    1550             :                                      LOGICALREP_TWOPHASE_STATE_DISABLED);
    1551           6 :                     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1552             :                 }
    1553             : 
    1554         200 :                 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
    1555             :                 {
    1556             :                     /*
    1557             :                      * Similar to the two_phase case above, we need to update
    1558             :                      * the failover option for both the slot and the
    1559             :                      * subscription.
    1560             :                      */
    1561          16 :                     update_failover = true;
    1562             : 
    1563          16 :                     CheckAlterSubOption(sub, "failover", update_failover,
    1564             :                                         isTopLevel);
    1565             : 
    1566           8 :                     values[Anum_pg_subscription_subfailover - 1] =
    1567           8 :                         BoolGetDatum(opts.failover);
    1568           8 :                     replaces[Anum_pg_subscription_subfailover - 1] = true;
    1569             :                 }
    1570             : 
    1571         192 :                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
    1572             :                 {
    1573           4 :                     values[Anum_pg_subscription_subretaindeadtuples - 1] =
    1574           4 :                         BoolGetDatum(opts.retaindeadtuples);
    1575           4 :                     replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
    1576             : 
    1577             :                     /*
    1578             :                      * Update the retention status only if there's a change in
    1579             :                      * the retain_dead_tuples option value.
    1580             :                      *
    1581             :                      * Automatically marking retention as active when
    1582             :                      * retain_dead_tuples is enabled may not always be ideal,
    1583             :                      * especially if retention was previously stopped and the
    1584             :                      * user toggles retain_dead_tuples without adjusting the
    1585             :                      * publisher workload. However, this behavior provides a
    1586             :                      * convenient way for users to manually refresh the
    1587             :                      * retention status. Since retention will be stopped again
    1588             :                      * unless the publisher workload is reduced, this approach
    1589             :                      * is acceptable for now.
    1590             :                      */
    1591           4 :                     if (opts.retaindeadtuples != sub->retaindeadtuples)
    1592             :                     {
    1593           4 :                         values[Anum_pg_subscription_subretentionactive - 1] =
    1594           4 :                             BoolGetDatum(opts.retaindeadtuples);
    1595           4 :                         replaces[Anum_pg_subscription_subretentionactive - 1] = true;
    1596             : 
    1597           4 :                         retention_active = opts.retaindeadtuples;
    1598             :                     }
    1599             : 
    1600           4 :                     CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
    1601             : 
    1602             :                     /*
    1603             :                      * Workers may continue running even after the
    1604             :                      * subscription has been disabled.
    1605             :                      *
    1606             :                      * To prevent race conditions (as described in
    1607             :                      * CheckAlterSubOption()), ensure that all worker
    1608             :                      * processes have already exited before proceeding.
    1609             :                      */
    1610           2 :                     if (logicalrep_workers_find(subid, true, true))
    1611           0 :                         ereport(ERROR,
    1612             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1613             :                                  errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
    1614             :                                  errhint("Try again after some time.")));
    1615             : 
    1616             :                     /*
    1617             :                      * Notify the launcher to manage the replication slot for
    1618             :                      * conflict detection. This ensures that replication slot
    1619             :                      * is efficiently handled (created, updated, or dropped)
    1620             :                      * in response to any configuration changes.
    1621             :                      */
    1622           2 :                     ApplyLauncherWakeupAtCommit();
    1623             : 
    1624           2 :                     check_pub_rdt = opts.retaindeadtuples;
    1625           2 :                     retain_dead_tuples = opts.retaindeadtuples;
    1626             :                 }
    1627             : 
    1628         190 :                 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1629             :                 {
    1630          10 :                     values[Anum_pg_subscription_submaxretention - 1] =
    1631          10 :                         Int32GetDatum(opts.maxretention);
    1632          10 :                     replaces[Anum_pg_subscription_submaxretention - 1] = true;
    1633             : 
    1634          10 :                     max_retention = opts.maxretention;
    1635             :                 }
    1636             : 
    1637             :                 /*
    1638             :                  * Ensure that system configuration paramters are set
    1639             :                  * appropriately to support retain_dead_tuples and
    1640             :                  * max_retention_duration.
    1641             :                  */
    1642         190 :                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
    1643         188 :                     IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1644          12 :                     CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
    1645             :                                                retain_dead_tuples,
    1646             :                                                retention_active,
    1647          12 :                                                (max_retention > 0));
    1648             : 
    1649         190 :                 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
    1650             :                 {
    1651          10 :                     values[Anum_pg_subscription_suborigin - 1] =
    1652          10 :                         CStringGetTextDatum(opts.origin);
    1653          10 :                     replaces[Anum_pg_subscription_suborigin - 1] = true;
    1654             : 
    1655             :                     /*
    1656             :                      * Check if changes from different origins may be received
    1657             :                      * from the publisher when the origin is changed to ANY
    1658             :                      * and retain_dead_tuples is enabled.
    1659             :                      */
    1660          14 :                     check_pub_rdt = retain_dead_tuples &&
    1661           4 :                         pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
    1662             : 
    1663          10 :                     origin = opts.origin;
    1664             :                 }
    1665             : 
    1666         190 :                 update_tuple = true;
    1667         190 :                 break;
    1668             :             }
    1669             : 
    1670         104 :         case ALTER_SUBSCRIPTION_ENABLED:
    1671             :             {
    1672         104 :                 parse_subscription_options(pstate, stmt->options,
    1673             :                                            SUBOPT_ENABLED, &opts);
    1674             :                 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
    1675             : 
    1676         104 :                 if (!sub->slotname && opts.enabled)
    1677           6 :                     ereport(ERROR,
    1678             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1679             :                              errmsg("cannot enable subscription that does not have a slot name")));
    1680             : 
    1681             :                 /*
    1682             :                  * Check track_commit_timestamp only when enabling the
    1683             :                  * subscription in case it was disabled after creation. See
    1684             :                  * comments atop CheckSubDeadTupleRetention() for details.
    1685             :                  */
    1686          98 :                 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
    1687          98 :                                            WARNING, sub->retaindeadtuples,
    1688          98 :                                            sub->retentionactive, false);
    1689             : 
    1690          98 :                 values[Anum_pg_subscription_subenabled - 1] =
    1691          98 :                     BoolGetDatum(opts.enabled);
    1692          98 :                 replaces[Anum_pg_subscription_subenabled - 1] = true;
    1693             : 
    1694          98 :                 if (opts.enabled)
    1695          54 :                     ApplyLauncherWakeupAtCommit();
    1696             : 
    1697          98 :                 update_tuple = true;
    1698             : 
    1699             :                 /*
    1700             :                  * The subscription might be initially created with
    1701             :                  * connect=false and retain_dead_tuples=true, meaning the
    1702             :                  * remote server's status may not be checked. Ensure this
    1703             :                  * check is conducted now.
    1704             :                  */
    1705          98 :                 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
    1706          98 :                 break;
    1707             :             }
    1708             : 
    1709          20 :         case ALTER_SUBSCRIPTION_CONNECTION:
    1710             :             /* Load the library providing us libpq calls. */
    1711          20 :             load_file("libpqwalreceiver", false);
    1712             :             /* Check the connection info string. */
    1713          20 :             walrcv_check_conninfo(stmt->conninfo,
    1714             :                                   sub->passwordrequired && !sub->ownersuperuser);
    1715             : 
    1716          14 :             values[Anum_pg_subscription_subconninfo - 1] =
    1717          14 :                 CStringGetTextDatum(stmt->conninfo);
    1718          14 :             replaces[Anum_pg_subscription_subconninfo - 1] = true;
    1719          14 :             update_tuple = true;
    1720             : 
    1721             :             /*
    1722             :              * Since the remote server configuration might have changed,
    1723             :              * perform a check to ensure it permits enabling
    1724             :              * retain_dead_tuples.
    1725             :              */
    1726          14 :             check_pub_rdt = sub->retaindeadtuples;
    1727          14 :             break;
    1728             : 
    1729          32 :         case ALTER_SUBSCRIPTION_SET_PUBLICATION:
    1730             :             {
    1731          32 :                 supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
    1732          32 :                 parse_subscription_options(pstate, stmt->options,
    1733             :                                            supported_opts, &opts);
    1734             : 
    1735          32 :                 values[Anum_pg_subscription_subpublications - 1] =
    1736          32 :                     publicationListToArray(stmt->publication);
    1737          32 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1738             : 
    1739          32 :                 update_tuple = true;
    1740             : 
    1741             :                 /* Refresh if user asked us to. */
    1742          32 :                 if (opts.refresh)
    1743             :                 {
    1744          26 :                     if (!sub->enabled)
    1745           0 :                         ereport(ERROR,
    1746             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1747             :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1748             :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
    1749             : 
    1750             :                     /*
    1751             :                      * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    1752             :                      * why this is not allowed.
    1753             :                      */
    1754          26 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1755           0 :                         ereport(ERROR,
    1756             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1757             :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1758             :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    1759             : 
    1760          26 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1761             : 
    1762             :                     /* Make sure refresh sees the new list of publications. */
    1763          14 :                     sub->publications = stmt->publication;
    1764             : 
    1765          14 :                     AlterSubscription_refresh(sub, opts.copy_data,
    1766             :                                               stmt->publication);
    1767             :                 }
    1768             : 
    1769          20 :                 break;
    1770             :             }
    1771             : 
    1772          54 :         case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
    1773             :         case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
    1774             :             {
    1775             :                 List       *publist;
    1776          54 :                 bool        isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
    1777             : 
    1778          54 :                 supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
    1779          54 :                 parse_subscription_options(pstate, stmt->options,
    1780             :                                            supported_opts, &opts);
    1781             : 
    1782          54 :                 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
    1783          18 :                 values[Anum_pg_subscription_subpublications - 1] =
    1784          18 :                     publicationListToArray(publist);
    1785          18 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1786             : 
    1787          18 :                 update_tuple = true;
    1788             : 
    1789             :                 /* Refresh if user asked us to. */
    1790          18 :                 if (opts.refresh)
    1791             :                 {
    1792             :                     /* We only need to validate user specified publications. */
    1793           6 :                     List       *validate_publications = (isadd) ? stmt->publication : NULL;
    1794             : 
    1795           6 :                     if (!sub->enabled)
    1796           0 :                         ereport(ERROR,
    1797             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1798             :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1799             :                         /* translator: %s is an SQL ALTER command */
    1800             :                                  errhint("Use %s instead.",
    1801             :                                          isadd ?
    1802             :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
    1803             :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
    1804             : 
    1805             :                     /*
    1806             :                      * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    1807             :                      * why this is not allowed.
    1808             :                      */
    1809           6 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1810           0 :                         ereport(ERROR,
    1811             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1812             :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1813             :                         /* translator: %s is an SQL ALTER command */
    1814             :                                  errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
    1815             :                                          isadd ?
    1816             :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
    1817             :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
    1818             : 
    1819           6 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1820             : 
    1821             :                     /* Refresh the new list of publications. */
    1822           6 :                     sub->publications = publist;
    1823             : 
    1824           6 :                     AlterSubscription_refresh(sub, opts.copy_data,
    1825             :                                               validate_publications);
    1826             :                 }
    1827             : 
    1828          18 :                 break;
    1829             :             }
    1830             : 
    1831          62 :         case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
    1832             :             {
    1833          62 :                 if (!sub->enabled)
    1834           6 :                     ereport(ERROR,
    1835             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1836             :                              errmsg("%s is not allowed for disabled subscriptions",
    1837             :                                     "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
    1838             : 
    1839          56 :                 parse_subscription_options(pstate, stmt->options,
    1840             :                                            SUBOPT_COPY_DATA, &opts);
    1841             : 
    1842             :                 /*
    1843             :                  * The subscription option "two_phase" requires that
    1844             :                  * replication has passed the initial table synchronization
    1845             :                  * phase before the two_phase becomes properly enabled.
    1846             :                  *
    1847             :                  * But, having reached this two-phase commit "enabled" state
    1848             :                  * we must not allow any subsequent table initialization to
    1849             :                  * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
    1850             :                  * disallowed when the user had requested two_phase = on mode.
    1851             :                  *
    1852             :                  * The exception to this restriction is when copy_data =
    1853             :                  * false, because when copy_data is false the tablesync will
    1854             :                  * start already in READY state and will exit directly without
    1855             :                  * doing anything.
    1856             :                  *
    1857             :                  * For more details see comments atop worker.c.
    1858             :                  */
    1859          56 :                 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1860           0 :                     ereport(ERROR,
    1861             :                             (errcode(ERRCODE_SYNTAX_ERROR),
    1862             :                              errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
    1863             :                              errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    1864             : 
    1865          56 :                 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
    1866             : 
    1867          50 :                 AlterSubscription_refresh(sub, opts.copy_data, NULL);
    1868             : 
    1869          48 :                 break;
    1870             :             }
    1871             : 
    1872           0 :         case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
    1873             :             {
    1874           0 :                 if (!sub->enabled)
    1875           0 :                     ereport(ERROR,
    1876             :                             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1877             :                             errmsg("%s is not allowed for disabled subscriptions",
    1878             :                                    "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
    1879             : 
    1880           0 :                 AlterSubscription_refresh_seq(sub);
    1881             : 
    1882           0 :                 break;
    1883             :             }
    1884             : 
    1885          24 :         case ALTER_SUBSCRIPTION_SKIP:
    1886             :             {
    1887          24 :                 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
    1888             : 
    1889             :                 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
    1890             :                 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
    1891             : 
    1892             :                 /*
    1893             :                  * If the user sets subskiplsn, we do a sanity check to make
    1894             :                  * sure that the specified LSN is a probable value.
    1895             :                  */
    1896          18 :                 if (!XLogRecPtrIsInvalid(opts.lsn))
    1897             :                 {
    1898             :                     RepOriginId originid;
    1899             :                     char        originname[NAMEDATALEN];
    1900             :                     XLogRecPtr  remote_lsn;
    1901             : 
    1902          12 :                     ReplicationOriginNameForLogicalRep(subid, InvalidOid,
    1903             :                                                        originname, sizeof(originname));
    1904          12 :                     originid = replorigin_by_name(originname, false);
    1905          12 :                     remote_lsn = replorigin_get_progress(originid, false);
    1906             : 
    1907             :                     /* Check the given LSN is at least a future LSN */
    1908          12 :                     if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
    1909           0 :                         ereport(ERROR,
    1910             :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1911             :                                  errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
    1912             :                                         LSN_FORMAT_ARGS(opts.lsn),
    1913             :                                         LSN_FORMAT_ARGS(remote_lsn))));
    1914             :                 }
    1915             : 
    1916          18 :                 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
    1917          18 :                 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
    1918             : 
    1919          18 :                 update_tuple = true;
    1920          18 :                 break;
    1921             :             }
    1922             : 
    1923           0 :         default:
    1924           0 :             elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
    1925             :                  stmt->kind);
    1926             :     }
    1927             : 
    1928             :     /* Update the catalog if needed. */
    1929         406 :     if (update_tuple)
    1930             :     {
    1931         358 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1932             :                                 replaces);
    1933             : 
    1934         358 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    1935             : 
    1936         358 :         heap_freetuple(tup);
    1937             :     }
    1938             : 
    1939             :     /*
    1940             :      * Try to acquire the connection necessary either for modifying the slot
    1941             :      * or for checking if the remote server permits enabling
    1942             :      * retain_dead_tuples.
    1943             :      *
    1944             :      * This has to be at the end because otherwise if there is an error while
    1945             :      * doing the database operations we won't be able to rollback altered
    1946             :      * slot.
    1947             :      */
    1948         406 :     if (update_failover || update_two_phase || check_pub_rdt)
    1949             :     {
    1950             :         bool        must_use_password;
    1951             :         char       *err;
    1952             :         WalReceiverConn *wrconn;
    1953             : 
    1954             :         /* Load the library providing us libpq calls. */
    1955          26 :         load_file("libpqwalreceiver", false);
    1956             : 
    1957             :         /*
    1958             :          * Try to connect to the publisher, using the new connection string if
    1959             :          * available.
    1960             :          */
    1961          26 :         must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1962          26 :         wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
    1963             :                                 true, true, must_use_password, sub->name,
    1964             :                                 &err);
    1965          26 :         if (!wrconn)
    1966           0 :             ereport(ERROR,
    1967             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    1968             :                      errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1969             :                             sub->name, err)));
    1970             : 
    1971          26 :         PG_TRY();
    1972             :         {
    1973          26 :             if (retain_dead_tuples)
    1974          18 :                 check_pub_dead_tuple_retention(wrconn);
    1975             : 
    1976          26 :             check_publications_origin_tables(wrconn, sub->publications, false,
    1977             :                                              retain_dead_tuples, origin, NULL, 0,
    1978             :                                              sub->name);
    1979             : 
    1980          26 :             if (update_failover || update_two_phase)
    1981          10 :                 walrcv_alter_slot(wrconn, sub->slotname,
    1982             :                                   update_failover ? &opts.failover : NULL,
    1983             :                                   update_two_phase ? &opts.twophase : NULL);
    1984             :         }
    1985           0 :         PG_FINALLY();
    1986             :         {
    1987          26 :             walrcv_disconnect(wrconn);
    1988             :         }
    1989          26 :         PG_END_TRY();
    1990             :     }
    1991             : 
    1992         406 :     table_close(rel, RowExclusiveLock);
    1993             : 
    1994         406 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    1995             : 
    1996         406 :     InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
    1997             : 
    1998             :     /* Wake up related replication workers to handle this change quickly. */
    1999         406 :     LogicalRepWorkersWakeupAtCommit(subid);
    2000             : 
    2001         406 :     return myself;
    2002             : }
    2003             : 
    2004             : /*
    2005             :  * Drop a subscription
    2006             :  */
    2007             : void
    2008         246 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    2009             : {
    2010             :     Relation    rel;
    2011             :     ObjectAddress myself;
    2012             :     HeapTuple   tup;
    2013             :     Oid         subid;
    2014             :     Oid         subowner;
    2015             :     Datum       datum;
    2016             :     bool        isnull;
    2017             :     char       *subname;
    2018             :     char       *conninfo;
    2019             :     char       *slotname;
    2020             :     List       *subworkers;
    2021             :     ListCell   *lc;
    2022             :     char        originname[NAMEDATALEN];
    2023         246 :     char       *err = NULL;
    2024             :     WalReceiverConn *wrconn;
    2025             :     Form_pg_subscription form;
    2026             :     List       *rstates;
    2027             :     bool        must_use_password;
    2028             : 
    2029             :     /*
    2030             :      * The launcher may concurrently start a new worker for this subscription.
    2031             :      * During initialization, the worker checks for subscription validity and
    2032             :      * exits if the subscription has already been dropped. See
    2033             :      * InitializeLogRepWorker.
    2034             :      */
    2035         246 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2036             : 
    2037         246 :     tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2038         246 :                           CStringGetDatum(stmt->subname));
    2039             : 
    2040         246 :     if (!HeapTupleIsValid(tup))
    2041             :     {
    2042          12 :         table_close(rel, NoLock);
    2043             : 
    2044          12 :         if (!stmt->missing_ok)
    2045           6 :             ereport(ERROR,
    2046             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2047             :                      errmsg("subscription \"%s\" does not exist",
    2048             :                             stmt->subname)));
    2049             :         else
    2050           6 :             ereport(NOTICE,
    2051             :                     (errmsg("subscription \"%s\" does not exist, skipping",
    2052             :                             stmt->subname)));
    2053             : 
    2054          92 :         return;
    2055             :     }
    2056             : 
    2057         234 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2058         234 :     subid = form->oid;
    2059         234 :     subowner = form->subowner;
    2060         234 :     must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
    2061             : 
    2062             :     /* must be owner */
    2063         234 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    2064           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2065           0 :                        stmt->subname);
    2066             : 
    2067             :     /* DROP hook for the subscription being removed */
    2068         234 :     InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
    2069             : 
    2070             :     /*
    2071             :      * Lock the subscription so nobody else can do anything with it (including
    2072             :      * the replication workers).
    2073             :      */
    2074         234 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    2075             : 
    2076             :     /* Get subname */
    2077         234 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2078             :                                    Anum_pg_subscription_subname);
    2079         234 :     subname = pstrdup(NameStr(*DatumGetName(datum)));
    2080             : 
    2081             :     /* Get conninfo */
    2082         234 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2083             :                                    Anum_pg_subscription_subconninfo);
    2084         234 :     conninfo = TextDatumGetCString(datum);
    2085             : 
    2086             :     /* Get slotname */
    2087         234 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
    2088             :                             Anum_pg_subscription_subslotname, &isnull);
    2089         234 :     if (!isnull)
    2090         148 :         slotname = pstrdup(NameStr(*DatumGetName(datum)));
    2091             :     else
    2092          86 :         slotname = NULL;
    2093             : 
    2094             :     /*
    2095             :      * Since dropping a replication slot is not transactional, the replication
    2096             :      * slot stays dropped even if the transaction rolls back.  So we cannot
    2097             :      * run DROP SUBSCRIPTION inside a transaction block if dropping the
    2098             :      * replication slot.  Also, in this case, we report a message for dropping
    2099             :      * the subscription to the cumulative stats system.
    2100             :      *
    2101             :      * XXX The command name should really be something like "DROP SUBSCRIPTION
    2102             :      * of a subscription that is associated with a replication slot", but we
    2103             :      * don't have the proper facilities for that.
    2104             :      */
    2105         234 :     if (slotname)
    2106         148 :         PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
    2107             : 
    2108         228 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    2109         228 :     EventTriggerSQLDropAddObject(&myself, true, true);
    2110             : 
    2111             :     /* Remove the tuple from catalog. */
    2112         228 :     CatalogTupleDelete(rel, &tup->t_self);
    2113             : 
    2114         228 :     ReleaseSysCache(tup);
    2115             : 
    2116             :     /*
    2117             :      * Stop all the subscription workers immediately.
    2118             :      *
    2119             :      * This is necessary if we are dropping the replication slot, so that the
    2120             :      * slot becomes accessible.
    2121             :      *
    2122             :      * It is also necessary if the subscription is disabled and was disabled
    2123             :      * in the same transaction.  Then the workers haven't seen the disabling
    2124             :      * yet and will still be running, leading to hangs later when we want to
    2125             :      * drop the replication origin.  If the subscription was disabled before
    2126             :      * this transaction, then there shouldn't be any workers left, so this
    2127             :      * won't make a difference.
    2128             :      *
    2129             :      * New workers won't be started because we hold an exclusive lock on the
    2130             :      * subscription till the end of the transaction.
    2131             :      */
    2132         228 :     subworkers = logicalrep_workers_find(subid, false, true);
    2133         374 :     foreach(lc, subworkers)
    2134             :     {
    2135         146 :         LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
    2136             : 
    2137         146 :         logicalrep_worker_stop(w->type, w->subid, w->relid);
    2138             :     }
    2139         228 :     list_free(subworkers);
    2140             : 
    2141             :     /*
    2142             :      * Remove the no-longer-useful entry in the launcher's table of apply
    2143             :      * worker start times.
    2144             :      *
    2145             :      * If this transaction rolls back, the launcher might restart a failed
    2146             :      * apply worker before wal_retrieve_retry_interval milliseconds have
    2147             :      * elapsed, but that's pretty harmless.
    2148             :      */
    2149         228 :     ApplyLauncherForgetWorkerStartTime(subid);
    2150             : 
    2151             :     /*
    2152             :      * Cleanup of tablesync replication origins.
    2153             :      *
    2154             :      * Any READY-state relations would already have dealt with clean-ups.
    2155             :      *
    2156             :      * Note that the state can't change because we have already stopped both
    2157             :      * the apply and tablesync workers and they can't restart because of
    2158             :      * exclusive lock on the subscription.
    2159             :      */
    2160         228 :     rstates = GetSubscriptionRelations(subid, true, false, true);
    2161         238 :     foreach(lc, rstates)
    2162             :     {
    2163          10 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2164          10 :         Oid         relid = rstate->relid;
    2165             : 
    2166             :         /* Only cleanup resources of tablesync workers */
    2167          10 :         if (!OidIsValid(relid))
    2168           0 :             continue;
    2169             : 
    2170             :         /*
    2171             :          * Drop the tablesync's origin tracking if exists.
    2172             :          *
    2173             :          * It is possible that the origin is not yet created for tablesync
    2174             :          * worker so passing missing_ok = true. This can happen for the states
    2175             :          * before SUBREL_STATE_FINISHEDCOPY.
    2176             :          */
    2177          10 :         ReplicationOriginNameForLogicalRep(subid, relid, originname,
    2178             :                                            sizeof(originname));
    2179          10 :         replorigin_drop_by_name(originname, true, false);
    2180             :     }
    2181             : 
    2182             :     /* Clean up dependencies */
    2183         228 :     deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
    2184             : 
    2185             :     /* Remove any associated relation synchronization states. */
    2186         228 :     RemoveSubscriptionRel(subid, InvalidOid);
    2187             : 
    2188             :     /* Remove the origin tracking if exists. */
    2189         228 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
    2190         228 :     replorigin_drop_by_name(originname, true, false);
    2191             : 
    2192             :     /*
    2193             :      * Tell the cumulative stats system that the subscription is getting
    2194             :      * dropped.
    2195             :      */
    2196         228 :     pgstat_drop_subscription(subid);
    2197             : 
    2198             :     /*
    2199             :      * If there is no slot associated with the subscription, we can finish
    2200             :      * here.
    2201             :      */
    2202         228 :     if (!slotname && rstates == NIL)
    2203             :     {
    2204          86 :         table_close(rel, NoLock);
    2205          86 :         return;
    2206             :     }
    2207             : 
    2208             :     /*
    2209             :      * Try to acquire the connection necessary for dropping slots.
    2210             :      *
    2211             :      * Note: If the slotname is NONE/NULL then we allow the command to finish
    2212             :      * and users need to manually cleanup the apply and tablesync worker slots
    2213             :      * later.
    2214             :      *
    2215             :      * This has to be at the end because otherwise if there is an error while
    2216             :      * doing the database operations we won't be able to rollback dropped
    2217             :      * slot.
    2218             :      */
    2219         142 :     load_file("libpqwalreceiver", false);
    2220             : 
    2221         142 :     wrconn = walrcv_connect(conninfo, true, true, must_use_password,
    2222             :                             subname, &err);
    2223         142 :     if (wrconn == NULL)
    2224             :     {
    2225           0 :         if (!slotname)
    2226             :         {
    2227             :             /* be tidy */
    2228           0 :             list_free(rstates);
    2229           0 :             table_close(rel, NoLock);
    2230           0 :             return;
    2231             :         }
    2232             :         else
    2233             :         {
    2234           0 :             ReportSlotConnectionError(rstates, subid, slotname, err);
    2235             :         }
    2236             :     }
    2237             : 
    2238         142 :     PG_TRY();
    2239             :     {
    2240         152 :         foreach(lc, rstates)
    2241             :         {
    2242          10 :             SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2243          10 :             Oid         relid = rstate->relid;
    2244             : 
    2245             :             /* Only cleanup resources of tablesync workers */
    2246          10 :             if (!OidIsValid(relid))
    2247           0 :                 continue;
    2248             : 
    2249             :             /*
    2250             :              * Drop the tablesync slots associated with removed tables.
    2251             :              *
    2252             :              * For SYNCDONE/READY states, the tablesync slot is known to have
    2253             :              * already been dropped by the tablesync worker.
    2254             :              *
    2255             :              * For other states, there is no certainty, maybe the slot does
    2256             :              * not exist yet. Also, if we fail after removing some of the
    2257             :              * slots, next time, it will again try to drop already dropped
    2258             :              * slots and fail. For these reasons, we allow missing_ok = true
    2259             :              * for the drop.
    2260             :              */
    2261          10 :             if (rstate->state != SUBREL_STATE_SYNCDONE)
    2262             :             {
    2263          10 :                 char        syncslotname[NAMEDATALEN] = {0};
    2264             : 
    2265          10 :                 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    2266             :                                                 sizeof(syncslotname));
    2267          10 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    2268             :             }
    2269             :         }
    2270             : 
    2271         142 :         list_free(rstates);
    2272             : 
    2273             :         /*
    2274             :          * If there is a slot associated with the subscription, then drop the
    2275             :          * replication slot at the publisher.
    2276             :          */
    2277         142 :         if (slotname)
    2278         142 :             ReplicationSlotDropAtPubNode(wrconn, slotname, false);
    2279             :     }
    2280           2 :     PG_FINALLY();
    2281             :     {
    2282         142 :         walrcv_disconnect(wrconn);
    2283             :     }
    2284         142 :     PG_END_TRY();
    2285             : 
    2286         140 :     table_close(rel, NoLock);
    2287             : }
    2288             : 
    2289             : /*
    2290             :  * Drop the replication slot at the publisher node using the replication
    2291             :  * connection.
    2292             :  *
    2293             :  * missing_ok - if true then only issue a LOG message if the slot doesn't
    2294             :  * exist.
    2295             :  */
    2296             : void
    2297         534 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
    2298             : {
    2299             :     StringInfoData cmd;
    2300             : 
    2301             :     Assert(wrconn);
    2302             : 
    2303         534 :     load_file("libpqwalreceiver", false);
    2304             : 
    2305         534 :     initStringInfo(&cmd);
    2306         534 :     appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
    2307             : 
    2308         534 :     PG_TRY();
    2309             :     {
    2310             :         WalRcvExecResult *res;
    2311             : 
    2312         534 :         res = walrcv_exec(wrconn, cmd.data, 0, NULL);
    2313             : 
    2314         534 :         if (res->status == WALRCV_OK_COMMAND)
    2315             :         {
    2316             :             /* NOTICE. Success. */
    2317         530 :             ereport(NOTICE,
    2318             :                     (errmsg("dropped replication slot \"%s\" on publisher",
    2319             :                             slotname)));
    2320             :         }
    2321           4 :         else if (res->status == WALRCV_ERROR &&
    2322           2 :                  missing_ok &&
    2323           2 :                  res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
    2324             :         {
    2325             :             /* LOG. Error, but missing_ok = true. */
    2326           2 :             ereport(LOG,
    2327             :                     (errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2328             :                             slotname, res->err)));
    2329             :         }
    2330             :         else
    2331             :         {
    2332             :             /* ERROR. */
    2333           2 :             ereport(ERROR,
    2334             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    2335             :                      errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2336             :                             slotname, res->err)));
    2337             :         }
    2338             : 
    2339         532 :         walrcv_clear_result(res);
    2340             :     }
    2341           2 :     PG_FINALLY();
    2342             :     {
    2343         534 :         pfree(cmd.data);
    2344             :     }
    2345         534 :     PG_END_TRY();
    2346         532 : }
    2347             : 
    2348             : /*
    2349             :  * Internal workhorse for changing a subscription owner
    2350             :  */
    2351             : static void
    2352          18 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2353             : {
    2354             :     Form_pg_subscription form;
    2355             :     AclResult   aclresult;
    2356             : 
    2357          18 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2358             : 
    2359          18 :     if (form->subowner == newOwnerId)
    2360           4 :         return;
    2361             : 
    2362          14 :     if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
    2363           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2364           0 :                        NameStr(form->subname));
    2365             : 
    2366             :     /*
    2367             :      * Don't allow non-superuser modification of a subscription with
    2368             :      * password_required=false.
    2369             :      */
    2370          14 :     if (!form->subpasswordrequired && !superuser())
    2371           0 :         ereport(ERROR,
    2372             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2373             :                  errmsg("password_required=false is superuser-only"),
    2374             :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    2375             : 
    2376             :     /* Must be able to become new owner */
    2377          14 :     check_can_set_role(GetUserId(), newOwnerId);
    2378             : 
    2379             :     /*
    2380             :      * current owner must have CREATE on database
    2381             :      *
    2382             :      * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
    2383             :      * other object types behave differently (e.g. you can't give a table to a
    2384             :      * user who lacks CREATE privileges on a schema).
    2385             :      */
    2386           8 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
    2387             :                                 GetUserId(), ACL_CREATE);
    2388           8 :     if (aclresult != ACLCHECK_OK)
    2389           0 :         aclcheck_error(aclresult, OBJECT_DATABASE,
    2390           0 :                        get_database_name(MyDatabaseId));
    2391             : 
    2392           8 :     form->subowner = newOwnerId;
    2393           8 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2394             : 
    2395             :     /* Update owner dependency reference */
    2396           8 :     changeDependencyOnOwner(SubscriptionRelationId,
    2397             :                             form->oid,
    2398             :                             newOwnerId);
    2399             : 
    2400           8 :     InvokeObjectPostAlterHook(SubscriptionRelationId,
    2401             :                               form->oid, 0);
    2402             : 
    2403             :     /* Wake up related background processes to handle this change quickly. */
    2404           8 :     ApplyLauncherWakeupAtCommit();
    2405           8 :     LogicalRepWorkersWakeupAtCommit(form->oid);
    2406             : }
    2407             : 
    2408             : /*
    2409             :  * Change subscription owner -- by name
    2410             :  */
    2411             : ObjectAddress
    2412          18 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
    2413             : {
    2414             :     Oid         subid;
    2415             :     HeapTuple   tup;
    2416             :     Relation    rel;
    2417             :     ObjectAddress address;
    2418             :     Form_pg_subscription form;
    2419             : 
    2420          18 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2421             : 
    2422          18 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2423             :                               CStringGetDatum(name));
    2424             : 
    2425          18 :     if (!HeapTupleIsValid(tup))
    2426           0 :         ereport(ERROR,
    2427             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2428             :                  errmsg("subscription \"%s\" does not exist", name)));
    2429             : 
    2430          18 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2431          18 :     subid = form->oid;
    2432             : 
    2433          18 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2434             : 
    2435          12 :     ObjectAddressSet(address, SubscriptionRelationId, subid);
    2436             : 
    2437          12 :     heap_freetuple(tup);
    2438             : 
    2439          12 :     table_close(rel, RowExclusiveLock);
    2440             : 
    2441          12 :     return address;
    2442             : }
    2443             : 
    2444             : /*
    2445             :  * Change subscription owner -- by OID
    2446             :  */
    2447             : void
    2448           0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
    2449             : {
    2450             :     HeapTuple   tup;
    2451             :     Relation    rel;
    2452             : 
    2453           0 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2454             : 
    2455           0 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
    2456             : 
    2457           0 :     if (!HeapTupleIsValid(tup))
    2458           0 :         ereport(ERROR,
    2459             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2460             :                  errmsg("subscription with OID %u does not exist", subid)));
    2461             : 
    2462           0 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2463             : 
    2464           0 :     heap_freetuple(tup);
    2465             : 
    2466           0 :     table_close(rel, RowExclusiveLock);
    2467           0 : }
    2468             : 
    2469             : /*
    2470             :  * Check and log a warning if the publisher has subscribed to the same table,
    2471             :  * its partition ancestors (if it's a partition), or its partition children (if
    2472             :  * it's a partitioned table), from some other publishers. This check is
    2473             :  * required in the following scenarios:
    2474             :  *
    2475             :  * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    2476             :  *    statements with "copy_data = true" and "origin = none":
    2477             :  *    - Warn the user that data with an origin might have been copied.
    2478             :  *    - This check is skipped for tables already added, as incremental sync via
    2479             :  *      WAL allows origin tracking. The list of such tables is in
    2480             :  *      subrel_local_oids.
    2481             :  *
    2482             :  * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    2483             :  *    statements with "retain_dead_tuples = true" and "origin = any", and for
    2484             :  *    ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
    2485             :  *    or when the publisher's status changes (e.g., due to a connection string
    2486             :  *    update):
    2487             :  *    - Warn the user that only conflict detection info for local changes on
    2488             :  *      the publisher is retained. Data from other origins may lack sufficient
    2489             :  *      details for reliable conflict detection.
    2490             :  *    - See comments atop worker.c for more details.
    2491             :  */
    2492             : static void
    2493         332 : check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
    2494             :                                  bool copydata, bool retain_dead_tuples,
    2495             :                                  char *origin, Oid *subrel_local_oids,
    2496             :                                  int subrel_count, char *subname)
    2497             : {
    2498             :     WalRcvExecResult *res;
    2499             :     StringInfoData cmd;
    2500             :     TupleTableSlot *slot;
    2501         332 :     Oid         tableRow[1] = {TEXTOID};
    2502         332 :     List       *publist = NIL;
    2503             :     int         i;
    2504             :     bool        check_rdt;
    2505             :     bool        check_table_sync;
    2506         664 :     bool        origin_none = origin &&
    2507         332 :         pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
    2508             : 
    2509             :     /*
    2510             :      * Enable retain_dead_tuples checks only when origin is set to 'any',
    2511             :      * since with origin='none' only local changes are replicated to the
    2512             :      * subscriber.
    2513             :      */
    2514         332 :     check_rdt = retain_dead_tuples && !origin_none;
    2515             : 
    2516             :     /*
    2517             :      * Enable table synchronization checks only when origin is 'none', to
    2518             :      * ensure that data from other origins is not inadvertently copied.
    2519             :      */
    2520         332 :     check_table_sync = copydata && origin_none;
    2521             : 
    2522             :     /* retain_dead_tuples and table sync checks occur separately */
    2523             :     Assert(!(check_rdt && check_table_sync));
    2524             : 
    2525             :     /* Return if no checks are required */
    2526         332 :     if (!check_rdt && !check_table_sync)
    2527         304 :         return;
    2528             : 
    2529          28 :     initStringInfo(&cmd);
    2530          28 :     appendStringInfoString(&cmd,
    2531             :                            "SELECT DISTINCT P.pubname AS pubname\n"
    2532             :                            "FROM pg_publication P,\n"
    2533             :                            "     LATERAL pg_get_publication_tables(P.pubname) GPT\n"
    2534             :                            "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
    2535             :                            "     GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
    2536             :                            "                   SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
    2537             :                            "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    2538             :                            "WHERE C.oid = GPT.relid AND P.pubname IN (");
    2539          28 :     GetPublicationsStr(publications, &cmd, true);
    2540          28 :     appendStringInfoString(&cmd, ")\n");
    2541             : 
    2542             :     /*
    2543             :      * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    2544             :      * subrel_local_oids contains the list of relation oids that are already
    2545             :      * present on the subscriber. This check should be skipped for these
    2546             :      * tables if checking for table sync scenario. However, when handling the
    2547             :      * retain_dead_tuples scenario, ensure all tables are checked, as some
    2548             :      * existing tables may now include changes from other origins due to newly
    2549             :      * created subscriptions on the publisher.
    2550             :      */
    2551          28 :     if (check_table_sync)
    2552             :     {
    2553          28 :         for (i = 0; i < subrel_count; i++)
    2554             :         {
    2555           8 :             Oid         relid = subrel_local_oids[i];
    2556           8 :             char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    2557           8 :             char       *tablename = get_rel_name(relid);
    2558             : 
    2559           8 :             appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
    2560             :                              schemaname, tablename);
    2561             :         }
    2562             :     }
    2563             : 
    2564          28 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    2565          28 :     pfree(cmd.data);
    2566             : 
    2567          28 :     if (res->status != WALRCV_OK_TUPLES)
    2568           0 :         ereport(ERROR,
    2569             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2570             :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    2571             :                         res->err)));
    2572             : 
    2573             :     /* Process publications. */
    2574          28 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2575          38 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2576             :     {
    2577             :         char       *pubname;
    2578             :         bool        isnull;
    2579             : 
    2580          10 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2581             :         Assert(!isnull);
    2582             : 
    2583          10 :         ExecClearTuple(slot);
    2584          10 :         publist = list_append_unique(publist, makeString(pubname));
    2585             :     }
    2586             : 
    2587             :     /*
    2588             :      * Log a warning if the publisher has subscribed to the same table from
    2589             :      * some other publisher. We cannot know the origin of data during the
    2590             :      * initial sync. Data origins can be found only from the WAL by looking at
    2591             :      * the origin id.
    2592             :      *
    2593             :      * XXX: For simplicity, we don't check whether the table has any data or
    2594             :      * not. If the table doesn't have any data then we don't need to
    2595             :      * distinguish between data having origin and data not having origin so we
    2596             :      * can avoid logging a warning for table sync scenario.
    2597             :      */
    2598          28 :     if (publist)
    2599             :     {
    2600          10 :         StringInfo  pubnames = makeStringInfo();
    2601          10 :         StringInfo  err_msg = makeStringInfo();
    2602          10 :         StringInfo  err_hint = makeStringInfo();
    2603             : 
    2604             :         /* Prepare the list of publication(s) for warning message. */
    2605          10 :         GetPublicationsStr(publist, pubnames, false);
    2606             : 
    2607          10 :         if (check_table_sync)
    2608             :         {
    2609           8 :             appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
    2610             :                              subname);
    2611           8 :             appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
    2612             :         }
    2613             :         else
    2614             :         {
    2615           2 :             appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
    2616             :                              subname);
    2617           2 :             appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
    2618             :         }
    2619             : 
    2620          10 :         ereport(WARNING,
    2621             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2622             :                 errmsg_internal("%s", err_msg->data),
    2623             :                 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    2624             :                                  "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    2625             :                                  list_length(publist), pubnames->data),
    2626             :                 errhint_internal("%s", err_hint->data));
    2627             :     }
    2628             : 
    2629          28 :     ExecDropSingleTupleTableSlot(slot);
    2630             : 
    2631          28 :     walrcv_clear_result(res);
    2632             : }
    2633             : 
    2634             : /*
    2635             :  * This function is similar to check_publications_origin_tables and serves
    2636             :  * same purpose for sequences.
    2637             :  */
    2638             : static void
    2639         306 : check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
    2640             :                                     bool copydata, char *origin,
    2641             :                                     Oid *subrel_local_oids, int subrel_count,
    2642             :                                     char *subname)
    2643             : {
    2644             :     WalRcvExecResult *res;
    2645             :     StringInfoData cmd;
    2646             :     TupleTableSlot *slot;
    2647         306 :     Oid         tableRow[1] = {TEXTOID};
    2648         306 :     List       *publist = NIL;
    2649             : 
    2650             :     /*
    2651             :      * Enable sequence synchronization checks only when origin is 'none' , to
    2652             :      * ensure that sequence data from other origins is not inadvertently
    2653             :      * copied.
    2654             :      */
    2655         306 :     if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
    2656         286 :         return;
    2657             : 
    2658          20 :     initStringInfo(&cmd);
    2659          20 :     appendStringInfoString(&cmd,
    2660             :                            "SELECT DISTINCT P.pubname AS pubname\n"
    2661             :                            "FROM pg_publication P,\n"
    2662             :                            "     LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
    2663             :                            "     JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
    2664             :                            "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    2665             :                            "WHERE C.oid = GPS.relid AND P.pubname IN (");
    2666             : 
    2667          20 :     GetPublicationsStr(publications, &cmd, true);
    2668          20 :     appendStringInfoString(&cmd, ")\n");
    2669             : 
    2670             :     /*
    2671             :      * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    2672             :      * subrel_local_oids contains the list of relations that are already
    2673             :      * present on the subscriber. This check should be skipped as these will
    2674             :      * not be re-synced.
    2675             :      */
    2676          20 :     for (int i = 0; i < subrel_count; i++)
    2677             :     {
    2678           0 :         Oid         relid = subrel_local_oids[i];
    2679           0 :         char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    2680           0 :         char       *seqname = get_rel_name(relid);
    2681             : 
    2682           0 :         appendStringInfo(&cmd,
    2683             :                          "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
    2684             :                          schemaname, seqname);
    2685             :     }
    2686             : 
    2687          20 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    2688          20 :     pfree(cmd.data);
    2689             : 
    2690          20 :     if (res->status != WALRCV_OK_TUPLES)
    2691           0 :         ereport(ERROR,
    2692             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2693             :                  errmsg("could not receive list of replicated sequences from the publisher: %s",
    2694             :                         res->err)));
    2695             : 
    2696             :     /* Process publications. */
    2697          20 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2698          20 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2699             :     {
    2700             :         char       *pubname;
    2701             :         bool        isnull;
    2702             : 
    2703           0 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2704             :         Assert(!isnull);
    2705             : 
    2706           0 :         ExecClearTuple(slot);
    2707           0 :         publist = list_append_unique(publist, makeString(pubname));
    2708             :     }
    2709             : 
    2710             :     /*
    2711             :      * Log a warning if the publisher has subscribed to the same sequence from
    2712             :      * some other publisher. We cannot know the origin of sequences data
    2713             :      * during the initial sync.
    2714             :      */
    2715          20 :     if (publist)
    2716             :     {
    2717           0 :         StringInfo  pubnames = makeStringInfo();
    2718           0 :         StringInfo  err_msg = makeStringInfo();
    2719           0 :         StringInfo  err_hint = makeStringInfo();
    2720             : 
    2721             :         /* Prepare the list of publication(s) for warning message. */
    2722           0 :         GetPublicationsStr(publist, pubnames, false);
    2723             : 
    2724           0 :         appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
    2725             :                          subname);
    2726           0 :         appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins."));
    2727             : 
    2728           0 :         ereport(WARNING,
    2729             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2730             :                 errmsg_internal("%s", err_msg->data),
    2731             :                 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
    2732             :                                  "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
    2733             :                                  list_length(publist), pubnames->data),
    2734             :                 errhint_internal("%s", err_hint->data));
    2735             :     }
    2736             : 
    2737          20 :     ExecDropSingleTupleTableSlot(slot);
    2738             : 
    2739          20 :     walrcv_clear_result(res);
    2740             : }
    2741             : 
    2742             : /*
    2743             :  * Determine whether the retain_dead_tuples can be enabled based on the
    2744             :  * publisher's status.
    2745             :  *
    2746             :  * This option is disallowed if the publisher is running a version earlier
    2747             :  * than the PG19, or if the publisher is in recovery (i.e., it is a standby
    2748             :  * server).
    2749             :  *
    2750             :  * See comments atop worker.c for a detailed explanation.
    2751             :  */
    2752             : static void
    2753          24 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
    2754             : {
    2755             :     WalRcvExecResult *res;
    2756          24 :     Oid         RecoveryRow[1] = {BOOLOID};
    2757             :     TupleTableSlot *slot;
    2758             :     bool        isnull;
    2759             :     bool        remote_in_recovery;
    2760             : 
    2761          24 :     if (walrcv_server_version(wrconn) < 19000)
    2762           0 :         ereport(ERROR,
    2763             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2764             :                 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
    2765             : 
    2766          24 :     res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
    2767             : 
    2768          24 :     if (res->status != WALRCV_OK_TUPLES)
    2769           0 :         ereport(ERROR,
    2770             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2771             :                  errmsg("could not obtain recovery progress from the publisher: %s",
    2772             :                         res->err)));
    2773             : 
    2774          24 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2775          24 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2776           0 :         elog(ERROR, "failed to fetch tuple for the recovery progress");
    2777             : 
    2778          24 :     remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
    2779             : 
    2780          24 :     if (remote_in_recovery)
    2781           0 :         ereport(ERROR,
    2782             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2783             :                 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
    2784             : 
    2785          24 :     ExecDropSingleTupleTableSlot(slot);
    2786             : 
    2787          24 :     walrcv_clear_result(res);
    2788          24 : }
    2789             : 
    2790             : /*
    2791             :  * Check if the subscriber's configuration is adequate to enable the
    2792             :  * retain_dead_tuples option.
    2793             :  *
    2794             :  * Issue an ERROR if the wal_level does not support the use of replication
    2795             :  * slots when check_guc is set to true.
    2796             :  *
    2797             :  * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
    2798             :  * set to true. This is only to highlight the importance of enabling
    2799             :  * track_commit_timestamp instead of catching all the misconfigurations, as
    2800             :  * this setting can be adjusted after subscription creation. Without it, the
    2801             :  * apply worker will simply skip conflict detection.
    2802             :  *
    2803             :  * Issue a WARNING or NOTICE if the subscription is disabled and the retention
    2804             :  * is active. Do not raise an ERROR since users can only modify
    2805             :  * retain_dead_tuples for disabled subscriptions. And as long as the
    2806             :  * subscription is enabled promptly, it will not pose issues.
    2807             :  *
    2808             :  * Issue a NOTICE to inform users that max_retention_duration is
    2809             :  * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
    2810             :  * is not issued because setting max_retention_duration causes no harm,
    2811             :  * even when it is ineffective.
    2812             :  */
    2813             : void
    2814         480 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
    2815             :                            int elevel_for_sub_disabled,
    2816             :                            bool retain_dead_tuples, bool retention_active,
    2817             :                            bool max_retention_set)
    2818             : {
    2819             :     Assert(elevel_for_sub_disabled == NOTICE ||
    2820             :            elevel_for_sub_disabled == WARNING);
    2821             : 
    2822         480 :     if (retain_dead_tuples)
    2823             :     {
    2824          34 :         if (check_guc && wal_level < WAL_LEVEL_REPLICA)
    2825           0 :             ereport(ERROR,
    2826             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2827             :                     errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
    2828             :                     errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
    2829             : 
    2830          34 :         if (check_guc && !track_commit_timestamp)
    2831           8 :             ereport(WARNING,
    2832             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2833             :                     errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
    2834             :                     errhint("Consider setting \"%s\" to true.",
    2835             :                             "track_commit_timestamp"));
    2836             : 
    2837          34 :         if (sub_disabled && retention_active)
    2838          14 :             ereport(elevel_for_sub_disabled,
    2839             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2840             :                     errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
    2841             :                     (elevel_for_sub_disabled > NOTICE)
    2842             :                     ? errhint("Consider setting %s to false.",
    2843             :                               "retain_dead_tuples") : 0);
    2844             :     }
    2845         446 :     else if (max_retention_set)
    2846             :     {
    2847           6 :         ereport(NOTICE,
    2848             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2849             :                 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
    2850             :     }
    2851         480 : }
    2852             : 
    2853             : /*
    2854             :  * Return true iff 'rv' is a member of the list.
    2855             :  */
    2856             : static bool
    2857         544 : list_member_rangevar(const List *list, RangeVar *rv)
    2858             : {
    2859        2000 :     foreach_ptr(PublicationRelKind, relinfo, list)
    2860             :     {
    2861         916 :         if (equal(relinfo->rv, rv))
    2862           2 :             return true;
    2863             :     }
    2864             : 
    2865         542 :     return false;
    2866             : }
    2867             : 
    2868             : /*
    2869             :  * Get the list of tables and sequences which belong to specified publications
    2870             :  * on the publisher connection.
    2871             :  *
    2872             :  * Note that we don't support the case where the column list is different for
    2873             :  * the same table in different publications to avoid sending unwanted column
    2874             :  * information for some of the rows. This can happen when both the column
    2875             :  * list and row filter are specified for different publications.
    2876             :  */
    2877             : static List *
    2878         306 : fetch_relation_list(WalReceiverConn *wrconn, List *publications)
    2879             : {
    2880             :     WalRcvExecResult *res;
    2881             :     StringInfoData cmd;
    2882             :     TupleTableSlot *slot;
    2883         306 :     Oid         tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
    2884         306 :     List       *relationlist = NIL;
    2885         306 :     int         server_version = walrcv_server_version(wrconn);
    2886         306 :     bool        check_columnlist = (server_version >= 150000);
    2887         306 :     int         column_count = check_columnlist ? 4 : 3;
    2888         306 :     StringInfo  pub_names = makeStringInfo();
    2889             : 
    2890         306 :     initStringInfo(&cmd);
    2891             : 
    2892             :     /* Build the pub_names comma-separated string. */
    2893         306 :     GetPublicationsStr(publications, pub_names, true);
    2894             : 
    2895             :     /* Get the list of relations from the publisher */
    2896         306 :     if (server_version >= 160000)
    2897             :     {
    2898         306 :         tableRow[3] = INT2VECTOROID;
    2899             : 
    2900             :         /*
    2901             :          * From version 16, we allowed passing multiple publications to the
    2902             :          * function pg_get_publication_tables. This helped to filter out the
    2903             :          * partition table whose ancestor is also published in this
    2904             :          * publication array.
    2905             :          *
    2906             :          * Join pg_get_publication_tables with pg_publication to exclude
    2907             :          * non-existing publications.
    2908             :          *
    2909             :          * Note that attrs are always stored in sorted order so we don't need
    2910             :          * to worry if different publications have specified them in a
    2911             :          * different order. See pub_collist_validate.
    2912             :          */
    2913         306 :         appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
    2914             :                          "   FROM pg_class c\n"
    2915             :                          "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
    2916             :                          "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
    2917             :                          "                FROM pg_publication\n"
    2918             :                          "                WHERE pubname IN ( %s )) AS gpt\n"
    2919             :                          "             ON gpt.relid = c.oid\n",
    2920             :                          pub_names->data);
    2921             : 
    2922             :         /* From version 19, inclusion of sequences in the target is supported */
    2923         306 :         if (server_version >= 190000)
    2924         306 :             appendStringInfo(&cmd,
    2925             :                              "UNION ALL\n"
    2926             :                              "  SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
    2927             :                              "  FROM pg_catalog.pg_publication_sequences s\n"
    2928             :                              "  WHERE s.pubname IN ( %s )",
    2929             :                              pub_names->data);
    2930             :     }
    2931             :     else
    2932             :     {
    2933           0 :         tableRow[3] = NAMEARRAYOID;
    2934           0 :         appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
    2935             : 
    2936             :         /* Get column lists for each relation if the publisher supports it */
    2937           0 :         if (check_columnlist)
    2938           0 :             appendStringInfoString(&cmd, ", t.attnames\n");
    2939             : 
    2940           0 :         appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
    2941             :                          " WHERE t.pubname IN ( %s )",
    2942             :                          pub_names->data);
    2943             :     }
    2944             : 
    2945         306 :     destroyStringInfo(pub_names);
    2946             : 
    2947         306 :     res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
    2948         306 :     pfree(cmd.data);
    2949             : 
    2950         306 :     if (res->status != WALRCV_OK_TUPLES)
    2951           0 :         ereport(ERROR,
    2952             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2953             :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    2954             :                         res->err)));
    2955             : 
    2956             :     /* Process tables. */
    2957         306 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2958         850 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2959             :     {
    2960             :         char       *nspname;
    2961             :         char       *relname;
    2962             :         bool        isnull;
    2963             :         char        relkind;
    2964         546 :         PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
    2965             : 
    2966         546 :         nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2967             :         Assert(!isnull);
    2968         546 :         relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
    2969             :         Assert(!isnull);
    2970         546 :         relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
    2971             :         Assert(!isnull);
    2972             : 
    2973         546 :         relinfo->rv = makeRangeVar(nspname, relname, -1);
    2974         546 :         relinfo->relkind = relkind;
    2975             : 
    2976         546 :         if (relkind != RELKIND_SEQUENCE &&
    2977         544 :             check_columnlist &&
    2978         544 :             list_member_rangevar(relationlist, relinfo->rv))
    2979           2 :             ereport(ERROR,
    2980             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2981             :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    2982             :                            nspname, relname));
    2983             :         else
    2984         544 :             relationlist = lappend(relationlist, relinfo);
    2985             : 
    2986         544 :         ExecClearTuple(slot);
    2987             :     }
    2988         304 :     ExecDropSingleTupleTableSlot(slot);
    2989             : 
    2990         304 :     walrcv_clear_result(res);
    2991             : 
    2992         304 :     return relationlist;
    2993             : }
    2994             : 
    2995             : /*
    2996             :  * This is to report the connection failure while dropping replication slots.
    2997             :  * Here, we report the WARNING for all tablesync slots so that user can drop
    2998             :  * them manually, if required.
    2999             :  */
    3000             : static void
    3001           0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
    3002             : {
    3003             :     ListCell   *lc;
    3004             : 
    3005           0 :     foreach(lc, rstates)
    3006             :     {
    3007           0 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    3008           0 :         Oid         relid = rstate->relid;
    3009             : 
    3010             :         /* Only cleanup resources of tablesync workers */
    3011           0 :         if (!OidIsValid(relid))
    3012           0 :             continue;
    3013             : 
    3014             :         /*
    3015             :          * Caller needs to ensure that relstate doesn't change underneath us.
    3016             :          * See DropSubscription where we get the relstates.
    3017             :          */
    3018           0 :         if (rstate->state != SUBREL_STATE_SYNCDONE)
    3019             :         {
    3020           0 :             char        syncslotname[NAMEDATALEN] = {0};
    3021             : 
    3022           0 :             ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    3023             :                                             sizeof(syncslotname));
    3024           0 :             elog(WARNING, "could not drop tablesync replication slot \"%s\"",
    3025             :                  syncslotname);
    3026             :         }
    3027             :     }
    3028             : 
    3029           0 :     ereport(ERROR,
    3030             :             (errcode(ERRCODE_CONNECTION_FAILURE),
    3031             :              errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
    3032             :                     slotname, err),
    3033             :     /* translator: %s is an SQL ALTER command */
    3034             :              errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
    3035             :                      "ALTER SUBSCRIPTION ... DISABLE",
    3036             :                      "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
    3037             : }
    3038             : 
    3039             : /*
    3040             :  * Check for duplicates in the given list of publications and error out if
    3041             :  * found one.  Add publications to datums as text datums, if datums is not
    3042             :  * NULL.
    3043             :  */
    3044             : static void
    3045         448 : check_duplicates_in_publist(List *publist, Datum *datums)
    3046             : {
    3047             :     ListCell   *cell;
    3048         448 :     int         j = 0;
    3049             : 
    3050        1020 :     foreach(cell, publist)
    3051             :     {
    3052         590 :         char       *name = strVal(lfirst(cell));
    3053             :         ListCell   *pcell;
    3054             : 
    3055         888 :         foreach(pcell, publist)
    3056             :         {
    3057         888 :             char       *pname = strVal(lfirst(pcell));
    3058             : 
    3059         888 :             if (pcell == cell)
    3060         572 :                 break;
    3061             : 
    3062         316 :             if (strcmp(name, pname) == 0)
    3063          18 :                 ereport(ERROR,
    3064             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    3065             :                          errmsg("publication name \"%s\" used more than once",
    3066             :                                 pname)));
    3067             :         }
    3068             : 
    3069         572 :         if (datums)
    3070         486 :             datums[j++] = CStringGetTextDatum(name);
    3071             :     }
    3072         430 : }
    3073             : 
    3074             : /*
    3075             :  * Merge current subscription's publications and user-specified publications
    3076             :  * from ADD/DROP PUBLICATIONS.
    3077             :  *
    3078             :  * If addpub is true, we will add the list of publications into oldpublist.
    3079             :  * Otherwise, we will delete the list of publications from oldpublist.  The
    3080             :  * returned list is a copy, oldpublist itself is not changed.
    3081             :  *
    3082             :  * subname is the subscription name, for error messages.
    3083             :  */
    3084             : static List *
    3085          54 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
    3086             : {
    3087             :     ListCell   *lc;
    3088             : 
    3089          54 :     oldpublist = list_copy(oldpublist);
    3090             : 
    3091          54 :     check_duplicates_in_publist(newpublist, NULL);
    3092             : 
    3093          92 :     foreach(lc, newpublist)
    3094             :     {
    3095          68 :         char       *name = strVal(lfirst(lc));
    3096             :         ListCell   *lc2;
    3097          68 :         bool        found = false;
    3098             : 
    3099         134 :         foreach(lc2, oldpublist)
    3100             :         {
    3101         110 :             char       *pubname = strVal(lfirst(lc2));
    3102             : 
    3103         110 :             if (strcmp(name, pubname) == 0)
    3104             :             {
    3105          44 :                 found = true;
    3106          44 :                 if (addpub)
    3107          12 :                     ereport(ERROR,
    3108             :                             (errcode(ERRCODE_DUPLICATE_OBJECT),
    3109             :                              errmsg("publication \"%s\" is already in subscription \"%s\"",
    3110             :                                     name, subname)));
    3111             :                 else
    3112          32 :                     oldpublist = foreach_delete_current(oldpublist, lc2);
    3113             : 
    3114          32 :                 break;
    3115             :             }
    3116             :         }
    3117             : 
    3118          56 :         if (addpub && !found)
    3119          18 :             oldpublist = lappend(oldpublist, makeString(name));
    3120          38 :         else if (!addpub && !found)
    3121           6 :             ereport(ERROR,
    3122             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3123             :                      errmsg("publication \"%s\" is not in subscription \"%s\"",
    3124             :                             name, subname)));
    3125             :     }
    3126             : 
    3127             :     /*
    3128             :      * XXX Probably no strong reason for this, but for now it's to make ALTER
    3129             :      * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
    3130             :      */
    3131          24 :     if (!oldpublist)
    3132           6 :         ereport(ERROR,
    3133             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3134             :                  errmsg("cannot drop all the publications from a subscription")));
    3135             : 
    3136          18 :     return oldpublist;
    3137             : }
    3138             : 
    3139             : /*
    3140             :  * Extract the streaming mode value from a DefElem.  This is like
    3141             :  * defGetBoolean() but also accepts the special value of "parallel".
    3142             :  */
    3143             : char
    3144         838 : defGetStreamingMode(DefElem *def)
    3145             : {
    3146             :     /*
    3147             :      * If no parameter value given, assume "true" is meant.
    3148             :      */
    3149         838 :     if (!def->arg)
    3150           0 :         return LOGICALREP_STREAM_ON;
    3151             : 
    3152             :     /*
    3153             :      * Allow 0, 1, "false", "true", "off", "on" or "parallel".
    3154             :      */
    3155         838 :     switch (nodeTag(def->arg))
    3156             :     {
    3157           0 :         case T_Integer:
    3158           0 :             switch (intVal(def->arg))
    3159             :             {
    3160           0 :                 case 0:
    3161           0 :                     return LOGICALREP_STREAM_OFF;
    3162           0 :                 case 1:
    3163           0 :                     return LOGICALREP_STREAM_ON;
    3164           0 :                 default:
    3165             :                     /* otherwise, error out below */
    3166           0 :                     break;
    3167             :             }
    3168           0 :             break;
    3169         838 :         default:
    3170             :             {
    3171         838 :                 char       *sval = defGetString(def);
    3172             : 
    3173             :                 /*
    3174             :                  * The set of strings accepted here should match up with the
    3175             :                  * grammar's opt_boolean_or_string production.
    3176             :                  */
    3177        1670 :                 if (pg_strcasecmp(sval, "false") == 0 ||
    3178         832 :                     pg_strcasecmp(sval, "off") == 0)
    3179          12 :                     return LOGICALREP_STREAM_OFF;
    3180        1634 :                 if (pg_strcasecmp(sval, "true") == 0 ||
    3181         808 :                     pg_strcasecmp(sval, "on") == 0)
    3182          92 :                     return LOGICALREP_STREAM_ON;
    3183         734 :                 if (pg_strcasecmp(sval, "parallel") == 0)
    3184         728 :                     return LOGICALREP_STREAM_PARALLEL;
    3185             :             }
    3186           6 :             break;
    3187             :     }
    3188             : 
    3189           6 :     ereport(ERROR,
    3190             :             (errcode(ERRCODE_SYNTAX_ERROR),
    3191             :              errmsg("%s requires a Boolean value or \"parallel\"",
    3192             :                     def->defname)));
    3193             :     return LOGICALREP_STREAM_OFF;   /* keep compiler quiet */
    3194             : }
 |