LCOV - code coverage report
Current view: top level - src/backend/commands - subscriptioncmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 895 1017 88.0 %
Date: 2025-11-30 13:18:29 Functions: 21 23 91.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * subscriptioncmds.c
       4             :  *      subscription catalog manipulation functions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/commands/subscriptioncmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/commit_ts.h"
      18             : #include "access/htup_details.h"
      19             : #include "access/table.h"
      20             : #include "access/twophase.h"
      21             : #include "access/xact.h"
      22             : #include "catalog/catalog.h"
      23             : #include "catalog/dependency.h"
      24             : #include "catalog/indexing.h"
      25             : #include "catalog/namespace.h"
      26             : #include "catalog/objectaccess.h"
      27             : #include "catalog/objectaddress.h"
      28             : #include "catalog/pg_authid_d.h"
      29             : #include "catalog/pg_database_d.h"
      30             : #include "catalog/pg_subscription.h"
      31             : #include "catalog/pg_subscription_rel.h"
      32             : #include "catalog/pg_type.h"
      33             : #include "commands/defrem.h"
      34             : #include "commands/event_trigger.h"
      35             : #include "commands/subscriptioncmds.h"
      36             : #include "executor/executor.h"
      37             : #include "miscadmin.h"
      38             : #include "nodes/makefuncs.h"
      39             : #include "pgstat.h"
      40             : #include "replication/logicallauncher.h"
      41             : #include "replication/logicalworker.h"
      42             : #include "replication/origin.h"
      43             : #include "replication/slot.h"
      44             : #include "replication/walreceiver.h"
      45             : #include "replication/walsender.h"
      46             : #include "replication/worker_internal.h"
      47             : #include "storage/lmgr.h"
      48             : #include "utils/acl.h"
      49             : #include "utils/builtins.h"
      50             : #include "utils/guc.h"
      51             : #include "utils/lsyscache.h"
      52             : #include "utils/memutils.h"
      53             : #include "utils/pg_lsn.h"
      54             : #include "utils/syscache.h"
      55             : 
      56             : /*
      57             :  * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
      58             :  * command.
      59             :  */
      60             : #define SUBOPT_CONNECT              0x00000001
      61             : #define SUBOPT_ENABLED              0x00000002
      62             : #define SUBOPT_CREATE_SLOT          0x00000004
      63             : #define SUBOPT_SLOT_NAME            0x00000008
      64             : #define SUBOPT_COPY_DATA            0x00000010
      65             : #define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020
      66             : #define SUBOPT_REFRESH              0x00000040
      67             : #define SUBOPT_BINARY               0x00000080
      68             : #define SUBOPT_STREAMING            0x00000100
      69             : #define SUBOPT_TWOPHASE_COMMIT      0x00000200
      70             : #define SUBOPT_DISABLE_ON_ERR       0x00000400
      71             : #define SUBOPT_PASSWORD_REQUIRED    0x00000800
      72             : #define SUBOPT_RUN_AS_OWNER         0x00001000
      73             : #define SUBOPT_FAILOVER             0x00002000
      74             : #define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000
      75             : #define SUBOPT_MAX_RETENTION_DURATION   0x00008000
      76             : #define SUBOPT_LSN                  0x00010000
      77             : #define SUBOPT_ORIGIN               0x00020000
      78             : 
      79             : /* check if the 'val' has 'bits' set */
      80             : #define IsSet(val, bits)  (((val) & (bits)) == (bits))
      81             : 
      82             : /*
      83             :  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
      84             :  * SUBSCRIPTION command options and the parsed/default values of each of them.
      85             :  */
      86             : typedef struct SubOpts
      87             : {
      88             :     bits32      specified_opts;
      89             :     char       *slot_name;
      90             :     char       *synchronous_commit;
      91             :     bool        connect;
      92             :     bool        enabled;
      93             :     bool        create_slot;
      94             :     bool        copy_data;
      95             :     bool        refresh;
      96             :     bool        binary;
      97             :     char        streaming;
      98             :     bool        twophase;
      99             :     bool        disableonerr;
     100             :     bool        passwordrequired;
     101             :     bool        runasowner;
     102             :     bool        failover;
     103             :     bool        retaindeadtuples;
     104             :     int32       maxretention;
     105             :     char       *origin;
     106             :     XLogRecPtr  lsn;
     107             : } SubOpts;
     108             : 
     109             : /*
     110             :  * PublicationRelKind represents a relation included in a publication.
     111             :  * It stores the schema-qualified relation name (rv) and its kind (relkind).
     112             :  */
     113             : typedef struct PublicationRelKind
     114             : {
     115             :     RangeVar   *rv;
     116             :     char        relkind;
     117             : } PublicationRelKind;
     118             : 
     119             : static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
     120             : static void check_publications_origin_tables(WalReceiverConn *wrconn,
     121             :                                              List *publications, bool copydata,
     122             :                                              bool retain_dead_tuples,
     123             :                                              char *origin,
     124             :                                              Oid *subrel_local_oids,
     125             :                                              int subrel_count, char *subname);
     126             : static void check_publications_origin_sequences(WalReceiverConn *wrconn,
     127             :                                                 List *publications,
     128             :                                                 bool copydata, char *origin,
     129             :                                                 Oid *subrel_local_oids,
     130             :                                                 int subrel_count,
     131             :                                                 char *subname);
     132             : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
     133             : static void check_duplicates_in_publist(List *publist, Datum *datums);
     134             : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
     135             : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
     136             : static void CheckAlterSubOption(Subscription *sub, const char *option,
     137             :                                 bool slot_needs_update, bool isTopLevel);
     138             : 
     139             : 
     140             : /*
     141             :  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
     142             :  *
     143             :  * Since not all options can be specified in both commands, this function
     144             :  * will report an error if mutually exclusive options are specified.
     145             :  */
     146             : static void
     147         976 : parse_subscription_options(ParseState *pstate, List *stmt_options,
     148             :                            bits32 supported_opts, SubOpts *opts)
     149             : {
     150             :     ListCell   *lc;
     151             : 
     152             :     /* Start out with cleared opts. */
     153         976 :     memset(opts, 0, sizeof(SubOpts));
     154             : 
     155             :     /* caller must expect some option */
     156             :     Assert(supported_opts != 0);
     157             : 
     158             :     /* If connect option is supported, these others also need to be. */
     159             :     Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
     160             :            IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     161             :                  SUBOPT_COPY_DATA));
     162             : 
     163             :     /* Set default values for the supported options. */
     164         976 :     if (IsSet(supported_opts, SUBOPT_CONNECT))
     165         482 :         opts->connect = true;
     166         976 :     if (IsSet(supported_opts, SUBOPT_ENABLED))
     167         586 :         opts->enabled = true;
     168         976 :     if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
     169         482 :         opts->create_slot = true;
     170         976 :     if (IsSet(supported_opts, SUBOPT_COPY_DATA))
     171         630 :         opts->copy_data = true;
     172         976 :     if (IsSet(supported_opts, SUBOPT_REFRESH))
     173          86 :         opts->refresh = true;
     174         976 :     if (IsSet(supported_opts, SUBOPT_BINARY))
     175         700 :         opts->binary = false;
     176         976 :     if (IsSet(supported_opts, SUBOPT_STREAMING))
     177         700 :         opts->streaming = LOGICALREP_STREAM_PARALLEL;
     178         976 :     if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
     179         700 :         opts->twophase = false;
     180         976 :     if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
     181         700 :         opts->disableonerr = false;
     182         976 :     if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
     183         700 :         opts->passwordrequired = true;
     184         976 :     if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
     185         700 :         opts->runasowner = false;
     186         976 :     if (IsSet(supported_opts, SUBOPT_FAILOVER))
     187         700 :         opts->failover = false;
     188         976 :     if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     189         700 :         opts->retaindeadtuples = false;
     190         976 :     if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
     191         700 :         opts->maxretention = 0;
     192         976 :     if (IsSet(supported_opts, SUBOPT_ORIGIN))
     193         700 :         opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
     194             : 
     195             :     /* Parse options */
     196        1928 :     foreach(lc, stmt_options)
     197             :     {
     198        1018 :         DefElem    *defel = (DefElem *) lfirst(lc);
     199             : 
     200        1018 :         if (IsSet(supported_opts, SUBOPT_CONNECT) &&
     201         592 :             strcmp(defel->defname, "connect") == 0)
     202             :         {
     203         190 :             if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
     204           0 :                 errorConflictingDefElem(defel, pstate);
     205             : 
     206         190 :             opts->specified_opts |= SUBOPT_CONNECT;
     207         190 :             opts->connect = defGetBoolean(defel);
     208             :         }
     209         828 :         else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
     210         506 :                  strcmp(defel->defname, "enabled") == 0)
     211             :         {
     212         142 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     213           0 :                 errorConflictingDefElem(defel, pstate);
     214             : 
     215         142 :             opts->specified_opts |= SUBOPT_ENABLED;
     216         142 :             opts->enabled = defGetBoolean(defel);
     217             :         }
     218         686 :         else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
     219         364 :                  strcmp(defel->defname, "create_slot") == 0)
     220             :         {
     221          40 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     222           0 :                 errorConflictingDefElem(defel, pstate);
     223             : 
     224          40 :             opts->specified_opts |= SUBOPT_CREATE_SLOT;
     225          40 :             opts->create_slot = defGetBoolean(defel);
     226             :         }
     227         646 :         else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
     228         546 :                  strcmp(defel->defname, "slot_name") == 0)
     229             :         {
     230         156 :             if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     231           0 :                 errorConflictingDefElem(defel, pstate);
     232             : 
     233         156 :             opts->specified_opts |= SUBOPT_SLOT_NAME;
     234         156 :             opts->slot_name = defGetString(defel);
     235             : 
     236             :             /* Setting slot_name = NONE is treated as no slot name. */
     237         156 :             if (strcmp(opts->slot_name, "none") == 0)
     238         122 :                 opts->slot_name = NULL;
     239             :             else
     240          34 :                 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
     241             :         }
     242         490 :         else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
     243         322 :                  strcmp(defel->defname, "copy_data") == 0)
     244             :         {
     245          58 :             if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     246           0 :                 errorConflictingDefElem(defel, pstate);
     247             : 
     248          58 :             opts->specified_opts |= SUBOPT_COPY_DATA;
     249          58 :             opts->copy_data = defGetBoolean(defel);
     250             :         }
     251         432 :         else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
     252         342 :                  strcmp(defel->defname, "synchronous_commit") == 0)
     253             :         {
     254          12 :             if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
     255           0 :                 errorConflictingDefElem(defel, pstate);
     256             : 
     257          12 :             opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
     258          12 :             opts->synchronous_commit = defGetString(defel);
     259             : 
     260             :             /* Test if the given value is valid for synchronous_commit GUC. */
     261          12 :             (void) set_config_option("synchronous_commit", opts->synchronous_commit,
     262             :                                      PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     263             :                                      false, 0, false);
     264             :         }
     265         420 :         else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
     266          66 :                  strcmp(defel->defname, "refresh") == 0)
     267             :         {
     268          66 :             if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
     269           0 :                 errorConflictingDefElem(defel, pstate);
     270             : 
     271          66 :             opts->specified_opts |= SUBOPT_REFRESH;
     272          66 :             opts->refresh = defGetBoolean(defel);
     273             :         }
     274         354 :         else if (IsSet(supported_opts, SUBOPT_BINARY) &&
     275         330 :                  strcmp(defel->defname, "binary") == 0)
     276             :         {
     277          32 :             if (IsSet(opts->specified_opts, SUBOPT_BINARY))
     278           0 :                 errorConflictingDefElem(defel, pstate);
     279             : 
     280          32 :             opts->specified_opts |= SUBOPT_BINARY;
     281          32 :             opts->binary = defGetBoolean(defel);
     282             :         }
     283         322 :         else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
     284         298 :                  strcmp(defel->defname, "streaming") == 0)
     285             :         {
     286          74 :             if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
     287           0 :                 errorConflictingDefElem(defel, pstate);
     288             : 
     289          74 :             opts->specified_opts |= SUBOPT_STREAMING;
     290          74 :             opts->streaming = defGetStreamingMode(defel);
     291             :         }
     292         248 :         else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
     293         224 :                  strcmp(defel->defname, "two_phase") == 0)
     294             :         {
     295          42 :             if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
     296           0 :                 errorConflictingDefElem(defel, pstate);
     297             : 
     298          42 :             opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
     299          42 :             opts->twophase = defGetBoolean(defel);
     300             :         }
     301         206 :         else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
     302         182 :                  strcmp(defel->defname, "disable_on_error") == 0)
     303             :         {
     304          20 :             if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
     305           0 :                 errorConflictingDefElem(defel, pstate);
     306             : 
     307          20 :             opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
     308          20 :             opts->disableonerr = defGetBoolean(defel);
     309             :         }
     310         186 :         else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
     311         162 :                  strcmp(defel->defname, "password_required") == 0)
     312             :         {
     313          24 :             if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
     314           0 :                 errorConflictingDefElem(defel, pstate);
     315             : 
     316          24 :             opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
     317          24 :             opts->passwordrequired = defGetBoolean(defel);
     318             :         }
     319         162 :         else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
     320         138 :                  strcmp(defel->defname, "run_as_owner") == 0)
     321             :         {
     322          18 :             if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
     323           0 :                 errorConflictingDefElem(defel, pstate);
     324             : 
     325          18 :             opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
     326          18 :             opts->runasowner = defGetBoolean(defel);
     327             :         }
     328         144 :         else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
     329         120 :                  strcmp(defel->defname, "failover") == 0)
     330             :         {
     331          26 :             if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
     332           0 :                 errorConflictingDefElem(defel, pstate);
     333             : 
     334          26 :             opts->specified_opts |= SUBOPT_FAILOVER;
     335          26 :             opts->failover = defGetBoolean(defel);
     336             :         }
     337         118 :         else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
     338          94 :                  strcmp(defel->defname, "retain_dead_tuples") == 0)
     339             :         {
     340          24 :             if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     341           0 :                 errorConflictingDefElem(defel, pstate);
     342             : 
     343          24 :             opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
     344          24 :             opts->retaindeadtuples = defGetBoolean(defel);
     345             :         }
     346          94 :         else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
     347          70 :                  strcmp(defel->defname, "max_retention_duration") == 0)
     348             :         {
     349          22 :             if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
     350           0 :                 errorConflictingDefElem(defel, pstate);
     351             : 
     352          22 :             opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
     353          22 :             opts->maxretention = defGetInt32(defel);
     354             :         }
     355          72 :         else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
     356          48 :                  strcmp(defel->defname, "origin") == 0)
     357             :         {
     358          42 :             if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
     359           0 :                 errorConflictingDefElem(defel, pstate);
     360             : 
     361          42 :             opts->specified_opts |= SUBOPT_ORIGIN;
     362          42 :             pfree(opts->origin);
     363             : 
     364             :             /*
     365             :              * Even though the "origin" parameter allows only "none" and "any"
     366             :              * values, it is implemented as a string type so that the
     367             :              * parameter can be extended in future versions to support
     368             :              * filtering using origin names specified by the user.
     369             :              */
     370          42 :             opts->origin = defGetString(defel);
     371             : 
     372          58 :             if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
     373          16 :                 (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
     374           6 :                 ereport(ERROR,
     375             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     376             :                         errmsg("unrecognized origin value: \"%s\"", opts->origin));
     377             :         }
     378          30 :         else if (IsSet(supported_opts, SUBOPT_LSN) &&
     379          24 :                  strcmp(defel->defname, "lsn") == 0)
     380          18 :         {
     381          24 :             char       *lsn_str = defGetString(defel);
     382             :             XLogRecPtr  lsn;
     383             : 
     384          24 :             if (IsSet(opts->specified_opts, SUBOPT_LSN))
     385           0 :                 errorConflictingDefElem(defel, pstate);
     386             : 
     387             :             /* Setting lsn = NONE is treated as resetting LSN */
     388          24 :             if (strcmp(lsn_str, "none") == 0)
     389           6 :                 lsn = InvalidXLogRecPtr;
     390             :             else
     391             :             {
     392             :                 /* Parse the argument as LSN */
     393          18 :                 lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
     394             :                                                       CStringGetDatum(lsn_str)));
     395             : 
     396          18 :                 if (!XLogRecPtrIsValid(lsn))
     397           6 :                     ereport(ERROR,
     398             :                             (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     399             :                              errmsg("invalid WAL location (LSN): %s", lsn_str)));
     400             :             }
     401             : 
     402          18 :             opts->specified_opts |= SUBOPT_LSN;
     403          18 :             opts->lsn = lsn;
     404             :         }
     405             :         else
     406           6 :             ereport(ERROR,
     407             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     408             :                      errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
     409             :     }
     410             : 
     411             :     /*
     412             :      * We've been explicitly asked to not connect, that requires some
     413             :      * additional processing.
     414             :      */
     415         910 :     if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
     416             :     {
     417             :         /* Check for incompatible options from the user. */
     418         148 :         if (opts->enabled &&
     419         148 :             IsSet(opts->specified_opts, SUBOPT_ENABLED))
     420           6 :             ereport(ERROR,
     421             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     422             :             /*- translator: both %s are strings of the form "option = value" */
     423             :                      errmsg("%s and %s are mutually exclusive options",
     424             :                             "connect = false", "enabled = true")));
     425             : 
     426         142 :         if (opts->create_slot &&
     427         136 :             IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     428           6 :             ereport(ERROR,
     429             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     430             :                      errmsg("%s and %s are mutually exclusive options",
     431             :                             "connect = false", "create_slot = true")));
     432             : 
     433         136 :         if (opts->copy_data &&
     434         130 :             IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     435           6 :             ereport(ERROR,
     436             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     437             :                      errmsg("%s and %s are mutually exclusive options",
     438             :                             "connect = false", "copy_data = true")));
     439             : 
     440             :         /* Change the defaults of other options. */
     441         130 :         opts->enabled = false;
     442         130 :         opts->create_slot = false;
     443         130 :         opts->copy_data = false;
     444             :     }
     445             : 
     446             :     /*
     447             :      * Do additional checking for disallowed combination when slot_name = NONE
     448             :      * was used.
     449             :      */
     450         892 :     if (!opts->slot_name &&
     451         864 :         IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     452             :     {
     453         116 :         if (opts->enabled)
     454             :         {
     455          18 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     456           6 :                 ereport(ERROR,
     457             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     458             :                 /*- translator: both %s are strings of the form "option = value" */
     459             :                          errmsg("%s and %s are mutually exclusive options",
     460             :                                 "slot_name = NONE", "enabled = true")));
     461             :             else
     462          12 :                 ereport(ERROR,
     463             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     464             :                 /*- translator: both %s are strings of the form "option = value" */
     465             :                          errmsg("subscription with %s must also set %s",
     466             :                                 "slot_name = NONE", "enabled = false")));
     467             :         }
     468             : 
     469          98 :         if (opts->create_slot)
     470             :         {
     471          12 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     472           6 :                 ereport(ERROR,
     473             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     474             :                 /*- translator: both %s are strings of the form "option = value" */
     475             :                          errmsg("%s and %s are mutually exclusive options",
     476             :                                 "slot_name = NONE", "create_slot = true")));
     477             :             else
     478           6 :                 ereport(ERROR,
     479             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     480             :                 /*- translator: both %s are strings of the form "option = value" */
     481             :                          errmsg("subscription with %s must also set %s",
     482             :                                 "slot_name = NONE", "create_slot = false")));
     483             :         }
     484             :     }
     485         862 : }
     486             : 
     487             : /*
     488             :  * Check that the specified publications are present on the publisher.
     489             :  */
     490             : static void
     491         256 : check_publications(WalReceiverConn *wrconn, List *publications)
     492             : {
     493             :     WalRcvExecResult *res;
     494             :     StringInfoData cmd;
     495             :     TupleTableSlot *slot;
     496         256 :     List       *publicationsCopy = NIL;
     497         256 :     Oid         tableRow[1] = {TEXTOID};
     498             : 
     499         256 :     initStringInfo(&cmd);
     500         256 :     appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
     501             :                            " pg_catalog.pg_publication t WHERE\n"
     502             :                            " t.pubname IN (");
     503         256 :     GetPublicationsStr(publications, &cmd, true);
     504         256 :     appendStringInfoChar(&cmd, ')');
     505             : 
     506         256 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
     507         256 :     pfree(cmd.data);
     508             : 
     509         256 :     if (res->status != WALRCV_OK_TUPLES)
     510           0 :         ereport(ERROR,
     511             :                 errmsg("could not receive list of publications from the publisher: %s",
     512             :                        res->err));
     513             : 
     514         256 :     publicationsCopy = list_copy(publications);
     515             : 
     516             :     /* Process publication(s). */
     517         256 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     518         568 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     519             :     {
     520             :         char       *pubname;
     521             :         bool        isnull;
     522             : 
     523         312 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     524             :         Assert(!isnull);
     525             : 
     526             :         /* Delete the publication present in publisher from the list. */
     527         312 :         publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
     528         312 :         ExecClearTuple(slot);
     529             :     }
     530             : 
     531         256 :     ExecDropSingleTupleTableSlot(slot);
     532             : 
     533         256 :     walrcv_clear_result(res);
     534             : 
     535         256 :     if (list_length(publicationsCopy))
     536             :     {
     537             :         /* Prepare the list of non-existent publication(s) for error message. */
     538             :         StringInfoData pubnames;
     539             : 
     540           8 :         initStringInfo(&pubnames);
     541             : 
     542           8 :         GetPublicationsStr(publicationsCopy, &pubnames, false);
     543           8 :         ereport(WARNING,
     544             :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     545             :                 errmsg_plural("publication %s does not exist on the publisher",
     546             :                               "publications %s do not exist on the publisher",
     547             :                               list_length(publicationsCopy),
     548             :                               pubnames.data));
     549             :     }
     550         256 : }
     551             : 
     552             : /*
     553             :  * Auxiliary function to build a text array out of a list of String nodes.
     554             :  */
     555             : static Datum
     556         394 : publicationListToArray(List *publist)
     557             : {
     558             :     ArrayType  *arr;
     559             :     Datum      *datums;
     560             :     MemoryContext memcxt;
     561             :     MemoryContext oldcxt;
     562             : 
     563             :     /* Create memory context for temporary allocations. */
     564         394 :     memcxt = AllocSetContextCreate(CurrentMemoryContext,
     565             :                                    "publicationListToArray to array",
     566             :                                    ALLOCSET_DEFAULT_SIZES);
     567         394 :     oldcxt = MemoryContextSwitchTo(memcxt);
     568             : 
     569         394 :     datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
     570             : 
     571         394 :     check_duplicates_in_publist(publist, datums);
     572             : 
     573         388 :     MemoryContextSwitchTo(oldcxt);
     574             : 
     575         388 :     arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
     576             : 
     577         388 :     MemoryContextDelete(memcxt);
     578             : 
     579         388 :     return PointerGetDatum(arr);
     580             : }
     581             : 
     582             : /*
     583             :  * Create new subscription.
     584             :  */
     585             : ObjectAddress
     586         482 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
     587             :                    bool isTopLevel)
     588             : {
     589             :     Relation    rel;
     590             :     ObjectAddress myself;
     591             :     Oid         subid;
     592             :     bool        nulls[Natts_pg_subscription];
     593             :     Datum       values[Natts_pg_subscription];
     594         482 :     Oid         owner = GetUserId();
     595             :     HeapTuple   tup;
     596             :     char       *conninfo;
     597             :     char        originname[NAMEDATALEN];
     598             :     List       *publications;
     599             :     bits32      supported_opts;
     600         482 :     SubOpts     opts = {0};
     601             :     AclResult   aclresult;
     602             : 
     603             :     /*
     604             :      * Parse and check options.
     605             :      *
     606             :      * Connection and publication should not be specified here.
     607             :      */
     608         482 :     supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     609             :                       SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
     610             :                       SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
     611             :                       SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
     612             :                       SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
     613             :                       SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
     614             :                       SUBOPT_RETAIN_DEAD_TUPLES |
     615             :                       SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
     616         482 :     parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
     617             : 
     618             :     /*
     619             :      * Since creating a replication slot is not transactional, rolling back
     620             :      * the transaction leaves the created replication slot.  So we cannot run
     621             :      * CREATE SUBSCRIPTION inside a transaction block if creating a
     622             :      * replication slot.
     623             :      */
     624         392 :     if (opts.create_slot)
     625         252 :         PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
     626             : 
     627             :     /*
     628             :      * We don't want to allow unprivileged users to be able to trigger
     629             :      * attempts to access arbitrary network destinations, so require the user
     630             :      * to have been specifically authorized to create subscriptions.
     631             :      */
     632         386 :     if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
     633           6 :         ereport(ERROR,
     634             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     635             :                  errmsg("permission denied to create subscription"),
     636             :                  errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
     637             :                            "pg_create_subscription")));
     638             : 
     639             :     /*
     640             :      * Since a subscription is a database object, we also check for CREATE
     641             :      * permission on the database.
     642             :      */
     643         380 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
     644             :                                 owner, ACL_CREATE);
     645         380 :     if (aclresult != ACLCHECK_OK)
     646          12 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     647           6 :                        get_database_name(MyDatabaseId));
     648             : 
     649             :     /*
     650             :      * Non-superusers are required to set a password for authentication, and
     651             :      * that password must be used by the target server, but the superuser can
     652             :      * exempt a subscription from this requirement.
     653             :      */
     654         374 :     if (!opts.passwordrequired && !superuser_arg(owner))
     655           6 :         ereport(ERROR,
     656             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     657             :                  errmsg("password_required=false is superuser-only"),
     658             :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
     659             : 
     660             :     /*
     661             :      * If built with appropriate switch, whine when regression-testing
     662             :      * conventions for subscription names are violated.
     663             :      */
     664             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
     665             :     if (strncmp(stmt->subname, "regress_", 8) != 0)
     666             :         elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
     667             : #endif
     668             : 
     669         368 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     670             : 
     671             :     /* Check if name is used */
     672         368 :     subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
     673             :                             ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
     674         368 :     if (OidIsValid(subid))
     675             :     {
     676           6 :         ereport(ERROR,
     677             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     678             :                  errmsg("subscription \"%s\" already exists",
     679             :                         stmt->subname)));
     680             :     }
     681             : 
     682             :     /*
     683             :      * Ensure that system configuration parameters are set appropriately to
     684             :      * support retain_dead_tuples and max_retention_duration.
     685             :      */
     686         362 :     CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
     687         362 :                                opts.retaindeadtuples, opts.retaindeadtuples,
     688         362 :                                (opts.maxretention > 0));
     689             : 
     690         362 :     if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
     691         320 :         opts.slot_name == NULL)
     692         320 :         opts.slot_name = stmt->subname;
     693             : 
     694             :     /* The default for synchronous_commit of subscriptions is off. */
     695         362 :     if (opts.synchronous_commit == NULL)
     696         362 :         opts.synchronous_commit = "off";
     697             : 
     698         362 :     conninfo = stmt->conninfo;
     699         362 :     publications = stmt->publication;
     700             : 
     701             :     /* Load the library providing us libpq calls. */
     702         362 :     load_file("libpqwalreceiver", false);
     703             : 
     704             :     /* Check the connection info string. */
     705         362 :     walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
     706             : 
     707             :     /* Everything ok, form a new tuple. */
     708         344 :     memset(values, 0, sizeof(values));
     709         344 :     memset(nulls, false, sizeof(nulls));
     710             : 
     711         344 :     subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
     712             :                                Anum_pg_subscription_oid);
     713         344 :     values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
     714         344 :     values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
     715         344 :     values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
     716         344 :     values[Anum_pg_subscription_subname - 1] =
     717         344 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
     718         344 :     values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
     719         344 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
     720         344 :     values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
     721         344 :     values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
     722         344 :     values[Anum_pg_subscription_subtwophasestate - 1] =
     723         344 :         CharGetDatum(opts.twophase ?
     724             :                      LOGICALREP_TWOPHASE_STATE_PENDING :
     725             :                      LOGICALREP_TWOPHASE_STATE_DISABLED);
     726         344 :     values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
     727         344 :     values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
     728         344 :     values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
     729         344 :     values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
     730         344 :     values[Anum_pg_subscription_subretaindeadtuples - 1] =
     731         344 :         BoolGetDatum(opts.retaindeadtuples);
     732         344 :     values[Anum_pg_subscription_submaxretention - 1] =
     733         344 :         Int32GetDatum(opts.maxretention);
     734         344 :     values[Anum_pg_subscription_subretentionactive - 1] =
     735         344 :         Int32GetDatum(opts.retaindeadtuples);
     736         344 :     values[Anum_pg_subscription_subconninfo - 1] =
     737         344 :         CStringGetTextDatum(conninfo);
     738         344 :     if (opts.slot_name)
     739         324 :         values[Anum_pg_subscription_subslotname - 1] =
     740         324 :             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
     741             :     else
     742          20 :         nulls[Anum_pg_subscription_subslotname - 1] = true;
     743         344 :     values[Anum_pg_subscription_subsynccommit - 1] =
     744         344 :         CStringGetTextDatum(opts.synchronous_commit);
     745         338 :     values[Anum_pg_subscription_subpublications - 1] =
     746         344 :         publicationListToArray(publications);
     747         338 :     values[Anum_pg_subscription_suborigin - 1] =
     748         338 :         CStringGetTextDatum(opts.origin);
     749             : 
     750         338 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     751             : 
     752             :     /* Insert tuple into catalog. */
     753         338 :     CatalogTupleInsert(rel, tup);
     754         338 :     heap_freetuple(tup);
     755             : 
     756         338 :     recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
     757             : 
     758             :     /*
     759             :      * A replication origin is currently created for all subscriptions,
     760             :      * including those that only contain sequences or are otherwise empty.
     761             :      *
     762             :      * XXX: While this is technically unnecessary, optimizing it would require
     763             :      * additional logic to skip origin creation during DDL operations and
     764             :      * apply workers initialization, and to handle origin creation dynamically
     765             :      * when tables are added to the subscription. It is not clear whether
     766             :      * preventing creation of origins is worth additional complexity.
     767             :      */
     768         338 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
     769         338 :     replorigin_create(originname);
     770             : 
     771             :     /*
     772             :      * Connect to remote side to execute requested commands and fetch table
     773             :      * and sequence info.
     774             :      */
     775         338 :     if (opts.connect)
     776             :     {
     777             :         char       *err;
     778             :         WalReceiverConn *wrconn;
     779             :         bool        must_use_password;
     780             : 
     781             :         /* Try to connect to the publisher. */
     782         244 :         must_use_password = !superuser_arg(owner) && opts.passwordrequired;
     783         244 :         wrconn = walrcv_connect(conninfo, true, true, must_use_password,
     784             :                                 stmt->subname, &err);
     785         244 :         if (!wrconn)
     786           6 :             ereport(ERROR,
     787             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     788             :                      errmsg("subscription \"%s\" could not connect to the publisher: %s",
     789             :                             stmt->subname, err)));
     790             : 
     791         238 :         PG_TRY();
     792             :         {
     793         238 :             bool        has_tables = false;
     794             :             List       *pubrels;
     795             :             char        relation_state;
     796             : 
     797         238 :             check_publications(wrconn, publications);
     798         238 :             check_publications_origin_tables(wrconn, publications,
     799         238 :                                              opts.copy_data,
     800         238 :                                              opts.retaindeadtuples, opts.origin,
     801             :                                              NULL, 0, stmt->subname);
     802         238 :             check_publications_origin_sequences(wrconn, publications,
     803         238 :                                                 opts.copy_data, opts.origin,
     804             :                                                 NULL, 0, stmt->subname);
     805             : 
     806         238 :             if (opts.retaindeadtuples)
     807           6 :                 check_pub_dead_tuple_retention(wrconn);
     808             : 
     809             :             /*
     810             :              * Set sync state based on if we were asked to do data copy or
     811             :              * not.
     812             :              */
     813         238 :             relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
     814             : 
     815             :             /*
     816             :              * Build local relation status info. Relations are for both tables
     817             :              * and sequences from the publisher.
     818             :              */
     819         238 :             pubrels = fetch_relation_list(wrconn, publications);
     820             : 
     821         842 :             foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
     822             :             {
     823             :                 Oid         relid;
     824             :                 char        relkind;
     825         370 :                 RangeVar   *rv = pubrelinfo->rv;
     826             : 
     827         370 :                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
     828         370 :                 relkind = get_rel_relkind(relid);
     829             : 
     830             :                 /* Check for supported relkind. */
     831         370 :                 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
     832         370 :                                          rv->schemaname, rv->relname);
     833         370 :                 has_tables |= (relkind != RELKIND_SEQUENCE);
     834         370 :                 AddSubscriptionRelState(subid, relid, relation_state,
     835             :                                         InvalidXLogRecPtr, true);
     836             :             }
     837             : 
     838             :             /*
     839             :              * If requested, create permanent slot for the subscription. We
     840             :              * won't use the initial snapshot for anything, so no need to
     841             :              * export it.
     842             :              *
     843             :              * XXX: Similar to origins, it is not clear whether preventing the
     844             :              * slot creation for empty and sequence-only subscriptions is
     845             :              * worth additional complexity.
     846             :              */
     847         236 :             if (opts.create_slot)
     848             :             {
     849         226 :                 bool        twophase_enabled = false;
     850             : 
     851             :                 Assert(opts.slot_name);
     852             : 
     853             :                 /*
     854             :                  * Even if two_phase is set, don't create the slot with
     855             :                  * two-phase enabled. Will enable it once all the tables are
     856             :                  * synced and ready. This avoids race-conditions like prepared
     857             :                  * transactions being skipped due to changes not being applied
     858             :                  * due to checks in should_apply_changes_for_rel() when
     859             :                  * tablesync for the corresponding tables are in progress. See
     860             :                  * comments atop worker.c.
     861             :                  *
     862             :                  * Note that if tables were specified but copy_data is false
     863             :                  * then it is safe to enable two_phase up-front because those
     864             :                  * tables are already initially in READY state. When the
     865             :                  * subscription has no tables, we leave the twophase state as
     866             :                  * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
     867             :                  * PUBLICATION to work.
     868             :                  */
     869         226 :                 if (opts.twophase && !opts.copy_data && has_tables)
     870           2 :                     twophase_enabled = true;
     871             : 
     872         226 :                 walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
     873             :                                    opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
     874             : 
     875         226 :                 if (twophase_enabled)
     876           2 :                     UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
     877             : 
     878         226 :                 ereport(NOTICE,
     879             :                         (errmsg("created replication slot \"%s\" on publisher",
     880             :                                 opts.slot_name)));
     881             :             }
     882             :         }
     883           2 :         PG_FINALLY();
     884             :         {
     885         238 :             walrcv_disconnect(wrconn);
     886             :         }
     887         238 :         PG_END_TRY();
     888             :     }
     889             :     else
     890          94 :         ereport(WARNING,
     891             :                 (errmsg("subscription was created, but is not connected"),
     892             :                  errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
     893             : 
     894         330 :     table_close(rel, RowExclusiveLock);
     895             : 
     896         330 :     pgstat_create_subscription(subid);
     897             : 
     898             :     /*
     899             :      * Notify the launcher to start the apply worker if the subscription is
     900             :      * enabled, or to create the conflict detection slot if retain_dead_tuples
     901             :      * is enabled.
     902             :      *
     903             :      * Creating the conflict detection slot is essential even when the
     904             :      * subscription is not enabled. This ensures that dead tuples are
     905             :      * retained, which is necessary for accurately identifying the type of
     906             :      * conflict during replication.
     907             :      */
     908         330 :     if (opts.enabled || opts.retaindeadtuples)
     909         226 :         ApplyLauncherWakeupAtCommit();
     910             : 
     911         330 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     912             : 
     913         330 :     InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
     914             : 
     915         330 :     return myself;
     916             : }
     917             : 
     918             : static void
     919          76 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
     920             :                           List *validate_publications)
     921             : {
     922             :     char       *err;
     923          76 :     List       *pubrels = NIL;
     924             :     Oid        *pubrel_local_oids;
     925             :     List       *subrel_states;
     926          76 :     List       *sub_remove_rels = NIL;
     927             :     Oid        *subrel_local_oids;
     928             :     Oid        *subseq_local_oids;
     929             :     int         subrel_count;
     930             :     ListCell   *lc;
     931             :     int         off;
     932          76 :     int         tbl_count = 0;
     933          76 :     int         seq_count = 0;
     934          76 :     Relation    rel = NULL;
     935             :     typedef struct SubRemoveRels
     936             :     {
     937             :         Oid         relid;
     938             :         char        state;
     939             :     } SubRemoveRels;
     940             : 
     941             :     WalReceiverConn *wrconn;
     942             :     bool        must_use_password;
     943             : 
     944             :     /* Load the library providing us libpq calls. */
     945          76 :     load_file("libpqwalreceiver", false);
     946             : 
     947             :     /* Try to connect to the publisher. */
     948          76 :     must_use_password = sub->passwordrequired && !sub->ownersuperuser;
     949          76 :     wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
     950             :                             sub->name, &err);
     951          74 :     if (!wrconn)
     952           0 :         ereport(ERROR,
     953             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     954             :                  errmsg("subscription \"%s\" could not connect to the publisher: %s",
     955             :                         sub->name, err)));
     956             : 
     957          74 :     PG_TRY();
     958             :     {
     959          74 :         if (validate_publications)
     960          18 :             check_publications(wrconn, validate_publications);
     961             : 
     962             :         /* Get the relation list from publisher. */
     963          74 :         pubrels = fetch_relation_list(wrconn, sub->publications);
     964             : 
     965             :         /* Get local relation list. */
     966          74 :         subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
     967          74 :         subrel_count = list_length(subrel_states);
     968             : 
     969             :         /*
     970             :          * Build qsorted arrays of local table oids and sequence oids for
     971             :          * faster lookup. This can potentially contain all tables and
     972             :          * sequences in the database so speed of lookup is important.
     973             :          *
     974             :          * We do not yet know the exact count of tables and sequences, so we
     975             :          * allocate separate arrays for table OIDs and sequence OIDs based on
     976             :          * the total number of relations (subrel_count).
     977             :          */
     978          74 :         subrel_local_oids = palloc(subrel_count * sizeof(Oid));
     979          74 :         subseq_local_oids = palloc(subrel_count * sizeof(Oid));
     980         268 :         foreach(lc, subrel_states)
     981             :         {
     982         194 :             SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
     983             : 
     984         194 :             if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
     985          18 :                 subseq_local_oids[seq_count++] = relstate->relid;
     986             :             else
     987         176 :                 subrel_local_oids[tbl_count++] = relstate->relid;
     988             :         }
     989             : 
     990          74 :         qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
     991          74 :         check_publications_origin_tables(wrconn, sub->publications, copy_data,
     992          74 :                                          sub->retaindeadtuples, sub->origin,
     993             :                                          subrel_local_oids, tbl_count,
     994             :                                          sub->name);
     995             : 
     996          74 :         qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
     997          74 :         check_publications_origin_sequences(wrconn, sub->publications,
     998             :                                             copy_data, sub->origin,
     999             :                                             subseq_local_oids, seq_count,
    1000             :                                             sub->name);
    1001             : 
    1002             :         /*
    1003             :          * Walk over the remote relations and try to match them to locally
    1004             :          * known relations. If the relation is not known locally create a new
    1005             :          * state for it.
    1006             :          *
    1007             :          * Also builds array of local oids of remote relations for the next
    1008             :          * step.
    1009             :          */
    1010          74 :         off = 0;
    1011          74 :         pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
    1012             : 
    1013         352 :         foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
    1014             :         {
    1015         204 :             RangeVar   *rv = pubrelinfo->rv;
    1016             :             Oid         relid;
    1017             :             char        relkind;
    1018             : 
    1019         204 :             relid = RangeVarGetRelid(rv, AccessShareLock, false);
    1020         204 :             relkind = get_rel_relkind(relid);
    1021             : 
    1022             :             /* Check for supported relkind. */
    1023         204 :             CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
    1024         204 :                                      rv->schemaname, rv->relname);
    1025             : 
    1026         204 :             pubrel_local_oids[off++] = relid;
    1027             : 
    1028         204 :             if (!bsearch(&relid, subrel_local_oids,
    1029          70 :                          tbl_count, sizeof(Oid), oid_cmp) &&
    1030          70 :                 !bsearch(&relid, subseq_local_oids,
    1031             :                          seq_count, sizeof(Oid), oid_cmp))
    1032             :             {
    1033          52 :                 AddSubscriptionRelState(sub->oid, relid,
    1034             :                                         copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
    1035             :                                         InvalidXLogRecPtr, true);
    1036          52 :                 ereport(DEBUG1,
    1037             :                         errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
    1038             :                                         relkind == RELKIND_SEQUENCE ? "sequence" : "table",
    1039             :                                         rv->schemaname, rv->relname, sub->name));
    1040             :             }
    1041             :         }
    1042             : 
    1043             :         /*
    1044             :          * Next remove state for tables we should not care about anymore using
    1045             :          * the data we collected above
    1046             :          */
    1047          74 :         qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
    1048             : 
    1049         250 :         for (off = 0; off < tbl_count; off++)
    1050             :         {
    1051         176 :             Oid         relid = subrel_local_oids[off];
    1052             : 
    1053         176 :             if (!bsearch(&relid, pubrel_local_oids,
    1054         176 :                          list_length(pubrels), sizeof(Oid), oid_cmp))
    1055             :             {
    1056             :                 char        state;
    1057             :                 XLogRecPtr  statelsn;
    1058          42 :                 SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels));
    1059             : 
    1060             :                 /*
    1061             :                  * Lock pg_subscription_rel with AccessExclusiveLock to
    1062             :                  * prevent any race conditions with the apply worker
    1063             :                  * re-launching workers at the same time this code is trying
    1064             :                  * to remove those tables.
    1065             :                  *
    1066             :                  * Even if new worker for this particular rel is restarted it
    1067             :                  * won't be able to make any progress as we hold exclusive
    1068             :                  * lock on pg_subscription_rel till the transaction end. It
    1069             :                  * will simply exit as there is no corresponding rel entry.
    1070             :                  *
    1071             :                  * This locking also ensures that the state of rels won't
    1072             :                  * change till we are done with this refresh operation.
    1073             :                  */
    1074          42 :                 if (!rel)
    1075          18 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1076             : 
    1077             :                 /* Last known rel state. */
    1078          42 :                 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
    1079             : 
    1080          42 :                 RemoveSubscriptionRel(sub->oid, relid);
    1081             : 
    1082          42 :                 remove_rel->relid = relid;
    1083          42 :                 remove_rel->state = state;
    1084             : 
    1085          42 :                 sub_remove_rels = lappend(sub_remove_rels, remove_rel);
    1086             : 
    1087          42 :                 logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
    1088             : 
    1089             :                 /*
    1090             :                  * For READY state, we would have already dropped the
    1091             :                  * tablesync origin.
    1092             :                  */
    1093          42 :                 if (state != SUBREL_STATE_READY)
    1094             :                 {
    1095             :                     char        originname[NAMEDATALEN];
    1096             : 
    1097             :                     /*
    1098             :                      * Drop the tablesync's origin tracking if exists.
    1099             :                      *
    1100             :                      * It is possible that the origin is not yet created for
    1101             :                      * tablesync worker, this can happen for the states before
    1102             :                      * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
    1103             :                      * apply worker can also concurrently try to drop the
    1104             :                      * origin and by this time the origin might be already
    1105             :                      * removed. For these reasons, passing missing_ok = true.
    1106             :                      */
    1107           0 :                     ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
    1108             :                                                        sizeof(originname));
    1109           0 :                     replorigin_drop_by_name(originname, true, false);
    1110             :                 }
    1111             : 
    1112          42 :                 ereport(DEBUG1,
    1113             :                         (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
    1114             :                                          get_namespace_name(get_rel_namespace(relid)),
    1115             :                                          get_rel_name(relid),
    1116             :                                          sub->name)));
    1117             :             }
    1118             :         }
    1119             : 
    1120             :         /*
    1121             :          * Drop the tablesync slots associated with removed tables. This has
    1122             :          * to be at the end because otherwise if there is an error while doing
    1123             :          * the database operations we won't be able to rollback dropped slots.
    1124             :          */
    1125         190 :         foreach_ptr(SubRemoveRels, rel, sub_remove_rels)
    1126             :         {
    1127          42 :             if (rel->state != SUBREL_STATE_READY &&
    1128           0 :                 rel->state != SUBREL_STATE_SYNCDONE)
    1129             :             {
    1130           0 :                 char        syncslotname[NAMEDATALEN] = {0};
    1131             : 
    1132             :                 /*
    1133             :                  * For READY/SYNCDONE states we know the tablesync slot has
    1134             :                  * already been dropped by the tablesync worker.
    1135             :                  *
    1136             :                  * For other states, there is no certainty, maybe the slot
    1137             :                  * does not exist yet. Also, if we fail after removing some of
    1138             :                  * the slots, next time, it will again try to drop already
    1139             :                  * dropped slots and fail. For these reasons, we allow
    1140             :                  * missing_ok = true for the drop.
    1141             :                  */
    1142           0 :                 ReplicationSlotNameForTablesync(sub->oid, rel->relid,
    1143             :                                                 syncslotname, sizeof(syncslotname));
    1144           0 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    1145             :             }
    1146             :         }
    1147             : 
    1148             :         /*
    1149             :          * Next remove state for sequences we should not care about anymore
    1150             :          * using the data we collected above
    1151             :          */
    1152          92 :         for (off = 0; off < seq_count; off++)
    1153             :         {
    1154          18 :             Oid         relid = subseq_local_oids[off];
    1155             : 
    1156          18 :             if (!bsearch(&relid, pubrel_local_oids,
    1157          18 :                          list_length(pubrels), sizeof(Oid), oid_cmp))
    1158             :             {
    1159             :                 /*
    1160             :                  * This locking ensures that the state of rels won't change
    1161             :                  * till we are done with this refresh operation.
    1162             :                  */
    1163           0 :                 if (!rel)
    1164           0 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1165             : 
    1166           0 :                 RemoveSubscriptionRel(sub->oid, relid);
    1167             : 
    1168           0 :                 ereport(DEBUG1,
    1169             :                         errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
    1170             :                                         get_namespace_name(get_rel_namespace(relid)),
    1171             :                                         get_rel_name(relid),
    1172             :                                         sub->name));
    1173             :             }
    1174             :         }
    1175             :     }
    1176           0 :     PG_FINALLY();
    1177             :     {
    1178          74 :         walrcv_disconnect(wrconn);
    1179             :     }
    1180          74 :     PG_END_TRY();
    1181             : 
    1182          74 :     if (rel)
    1183          18 :         table_close(rel, NoLock);
    1184          74 : }
    1185             : 
    1186             : /*
    1187             :  * Marks all sequences with INIT state.
    1188             :  */
    1189             : static void
    1190           2 : AlterSubscription_refresh_seq(Subscription *sub)
    1191             : {
    1192           2 :     char       *err = NULL;
    1193             :     WalReceiverConn *wrconn;
    1194             :     bool        must_use_password;
    1195             : 
    1196             :     /* Load the library providing us libpq calls. */
    1197           2 :     load_file("libpqwalreceiver", false);
    1198             : 
    1199             :     /* Try to connect to the publisher. */
    1200           2 :     must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1201           2 :     wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
    1202             :                             sub->name, &err);
    1203           2 :     if (!wrconn)
    1204           0 :         ereport(ERROR,
    1205             :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1206             :                 errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1207             :                        sub->name, err));
    1208             : 
    1209           2 :     PG_TRY();
    1210             :     {
    1211             :         List       *subrel_states;
    1212             : 
    1213           2 :         check_publications_origin_sequences(wrconn, sub->publications, true,
    1214             :                                             sub->origin, NULL, 0, sub->name);
    1215             : 
    1216             :         /* Get local sequence list. */
    1217           2 :         subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
    1218          10 :         foreach_ptr(SubscriptionRelState, subrel, subrel_states)
    1219             :         {
    1220           6 :             Oid         relid = subrel->relid;
    1221             : 
    1222           6 :             UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
    1223             :                                        InvalidXLogRecPtr, false);
    1224           6 :             ereport(DEBUG1,
    1225             :                     errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
    1226             :                                     get_namespace_name(get_rel_namespace(relid)),
    1227             :                                     get_rel_name(relid),
    1228             :                                     sub->name));
    1229             :         }
    1230             :     }
    1231           0 :     PG_FINALLY();
    1232             :     {
    1233           2 :         walrcv_disconnect(wrconn);
    1234             :     }
    1235           2 :     PG_END_TRY();
    1236           2 : }
    1237             : 
    1238             : /*
    1239             :  * Common checks for altering failover, two_phase, and retain_dead_tuples
    1240             :  * options.
    1241             :  */
    1242             : static void
    1243          26 : CheckAlterSubOption(Subscription *sub, const char *option,
    1244             :                     bool slot_needs_update, bool isTopLevel)
    1245             : {
    1246             :     Assert(strcmp(option, "failover") == 0 ||
    1247             :            strcmp(option, "two_phase") == 0 ||
    1248             :            strcmp(option, "retain_dead_tuples") == 0);
    1249             : 
    1250             :     /*
    1251             :      * Altering the retain_dead_tuples option does not update the slot on the
    1252             :      * publisher.
    1253             :      */
    1254             :     Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
    1255             : 
    1256             :     /*
    1257             :      * Do not allow changing the option if the subscription is enabled. This
    1258             :      * is because both failover and two_phase options of the slot on the
    1259             :      * publisher cannot be modified if the slot is currently acquired by the
    1260             :      * existing walsender.
    1261             :      *
    1262             :      * Note that two_phase is enabled (aka changed from 'false' to 'true') on
    1263             :      * the publisher by the existing walsender, so we could have allowed that
    1264             :      * even when the subscription is enabled. But we kept this restriction for
    1265             :      * the sake of consistency and simplicity.
    1266             :      *
    1267             :      * Additionally, do not allow changing the retain_dead_tuples option when
    1268             :      * the subscription is enabled to prevent race conditions arising from the
    1269             :      * new option value being acknowledged asynchronously by the launcher and
    1270             :      * apply workers.
    1271             :      *
    1272             :      * Without the restriction, a race condition may arise when a user
    1273             :      * disables and immediately re-enables the retain_dead_tuples option. In
    1274             :      * this case, the launcher might drop the slot upon noticing the disabled
    1275             :      * action, while the apply worker may keep maintaining
    1276             :      * oldest_nonremovable_xid without noticing the option change. During this
    1277             :      * period, a transaction ID wraparound could falsely make this ID appear
    1278             :      * as if it originates from the future w.r.t the transaction ID stored in
    1279             :      * the slot maintained by launcher.
    1280             :      *
    1281             :      * Similarly, if the user enables retain_dead_tuples concurrently with the
    1282             :      * launcher starting the worker, the apply worker may start calculating
    1283             :      * oldest_nonremovable_xid before the launcher notices the enable action.
    1284             :      * Consequently, the launcher may update slot.xmin to a newer value than
    1285             :      * that maintained by the worker. In subsequent cycles, upon integrating
    1286             :      * the worker's oldest_nonremovable_xid, the launcher might detect a
    1287             :      * retreat in the calculated xmin, necessitating additional handling.
    1288             :      *
    1289             :      * XXX To address the above race conditions, we can define
    1290             :      * oldest_nonremovable_xid as FullTransactionID and adds the check to
    1291             :      * disallow retreating the conflict slot's xmin. For now, we kept the
    1292             :      * implementation simple by disallowing change to the retain_dead_tuples,
    1293             :      * but in the future we can change this after some more analysis.
    1294             :      *
    1295             :      * Note that we could restrict only the enabling of retain_dead_tuples to
    1296             :      * avoid the race conditions described above, but we maintain the
    1297             :      * restriction for both enable and disable operations for the sake of
    1298             :      * consistency.
    1299             :      */
    1300          26 :     if (sub->enabled)
    1301           4 :         ereport(ERROR,
    1302             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1303             :                  errmsg("cannot set option \"%s\" for enabled subscription",
    1304             :                         option)));
    1305             : 
    1306          22 :     if (slot_needs_update)
    1307             :     {
    1308             :         StringInfoData cmd;
    1309             : 
    1310             :         /*
    1311             :          * A valid slot must be associated with the subscription for us to
    1312             :          * modify any of the slot's properties.
    1313             :          */
    1314          16 :         if (!sub->slotname)
    1315           0 :             ereport(ERROR,
    1316             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1317             :                      errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
    1318             :                             option)));
    1319             : 
    1320             :         /* The changed option of the slot can't be rolled back. */
    1321          16 :         initStringInfo(&cmd);
    1322          16 :         appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
    1323             : 
    1324          16 :         PreventInTransactionBlock(isTopLevel, cmd.data);
    1325          10 :         pfree(cmd.data);
    1326             :     }
    1327          16 : }
    1328             : 
    1329             : /*
    1330             :  * Alter the existing subscription.
    1331             :  */
    1332             : ObjectAddress
    1333         528 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
    1334             :                   bool isTopLevel)
    1335             : {
    1336             :     Relation    rel;
    1337             :     ObjectAddress myself;
    1338             :     bool        nulls[Natts_pg_subscription];
    1339             :     bool        replaces[Natts_pg_subscription];
    1340             :     Datum       values[Natts_pg_subscription];
    1341             :     HeapTuple   tup;
    1342             :     Oid         subid;
    1343         528 :     bool        update_tuple = false;
    1344         528 :     bool        update_failover = false;
    1345         528 :     bool        update_two_phase = false;
    1346         528 :     bool        check_pub_rdt = false;
    1347             :     bool        retain_dead_tuples;
    1348             :     int         max_retention;
    1349             :     bool        retention_active;
    1350             :     char       *origin;
    1351             :     Subscription *sub;
    1352             :     Form_pg_subscription form;
    1353             :     bits32      supported_opts;
    1354         528 :     SubOpts     opts = {0};
    1355             : 
    1356         528 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1357             : 
    1358             :     /* Fetch the existing tuple. */
    1359         528 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    1360             :                               CStringGetDatum(stmt->subname));
    1361             : 
    1362         528 :     if (!HeapTupleIsValid(tup))
    1363           6 :         ereport(ERROR,
    1364             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1365             :                  errmsg("subscription \"%s\" does not exist",
    1366             :                         stmt->subname)));
    1367             : 
    1368         522 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1369         522 :     subid = form->oid;
    1370             : 
    1371             :     /* must be owner */
    1372         522 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    1373           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    1374           0 :                        stmt->subname);
    1375             : 
    1376         522 :     sub = GetSubscription(subid, false);
    1377             : 
    1378         522 :     retain_dead_tuples = sub->retaindeadtuples;
    1379         522 :     origin = sub->origin;
    1380         522 :     max_retention = sub->maxretention;
    1381         522 :     retention_active = sub->retentionactive;
    1382             : 
    1383             :     /*
    1384             :      * Don't allow non-superuser modification of a subscription with
    1385             :      * password_required=false.
    1386             :      */
    1387         522 :     if (!sub->passwordrequired && !superuser())
    1388           0 :         ereport(ERROR,
    1389             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1390             :                  errmsg("password_required=false is superuser-only"),
    1391             :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1392             : 
    1393             :     /* Lock the subscription so nobody else can do anything with it. */
    1394         522 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    1395             : 
    1396             :     /* Form a new tuple. */
    1397         522 :     memset(values, 0, sizeof(values));
    1398         522 :     memset(nulls, false, sizeof(nulls));
    1399         522 :     memset(replaces, false, sizeof(replaces));
    1400             : 
    1401         522 :     switch (stmt->kind)
    1402             :     {
    1403         218 :         case ALTER_SUBSCRIPTION_OPTIONS:
    1404             :             {
    1405         218 :                 supported_opts = (SUBOPT_SLOT_NAME |
    1406             :                                   SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
    1407             :                                   SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
    1408             :                                   SUBOPT_DISABLE_ON_ERR |
    1409             :                                   SUBOPT_PASSWORD_REQUIRED |
    1410             :                                   SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
    1411             :                                   SUBOPT_RETAIN_DEAD_TUPLES |
    1412             :                                   SUBOPT_MAX_RETENTION_DURATION |
    1413             :                                   SUBOPT_ORIGIN);
    1414             : 
    1415         218 :                 parse_subscription_options(pstate, stmt->options,
    1416             :                                            supported_opts, &opts);
    1417             : 
    1418         200 :                 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1419             :                 {
    1420             :                     /*
    1421             :                      * The subscription must be disabled to allow slot_name as
    1422             :                      * 'none', otherwise, the apply worker will repeatedly try
    1423             :                      * to stream the data using that slot_name which neither
    1424             :                      * exists on the publisher nor the user will be allowed to
    1425             :                      * create it.
    1426             :                      */
    1427          72 :                     if (sub->enabled && !opts.slot_name)
    1428           0 :                         ereport(ERROR,
    1429             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1430             :                                  errmsg("cannot set %s for enabled subscription",
    1431             :                                         "slot_name = NONE")));
    1432             : 
    1433          72 :                     if (opts.slot_name)
    1434           6 :                         values[Anum_pg_subscription_subslotname - 1] =
    1435           6 :                             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
    1436             :                     else
    1437          66 :                         nulls[Anum_pg_subscription_subslotname - 1] = true;
    1438          72 :                     replaces[Anum_pg_subscription_subslotname - 1] = true;
    1439             :                 }
    1440             : 
    1441         200 :                 if (opts.synchronous_commit)
    1442             :                 {
    1443           6 :                     values[Anum_pg_subscription_subsynccommit - 1] =
    1444           6 :                         CStringGetTextDatum(opts.synchronous_commit);
    1445           6 :                     replaces[Anum_pg_subscription_subsynccommit - 1] = true;
    1446             :                 }
    1447             : 
    1448         200 :                 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
    1449             :                 {
    1450          18 :                     values[Anum_pg_subscription_subbinary - 1] =
    1451          18 :                         BoolGetDatum(opts.binary);
    1452          18 :                     replaces[Anum_pg_subscription_subbinary - 1] = true;
    1453             :                 }
    1454             : 
    1455         200 :                 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
    1456             :                 {
    1457          30 :                     values[Anum_pg_subscription_substream - 1] =
    1458          30 :                         CharGetDatum(opts.streaming);
    1459          30 :                     replaces[Anum_pg_subscription_substream - 1] = true;
    1460             :                 }
    1461             : 
    1462         200 :                 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
    1463             :                 {
    1464             :                     values[Anum_pg_subscription_subdisableonerr - 1]
    1465           6 :                         = BoolGetDatum(opts.disableonerr);
    1466             :                     replaces[Anum_pg_subscription_subdisableonerr - 1]
    1467           6 :                         = true;
    1468             :                 }
    1469             : 
    1470         200 :                 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
    1471             :                 {
    1472             :                     /* Non-superuser may not disable password_required. */
    1473          12 :                     if (!opts.passwordrequired && !superuser())
    1474           0 :                         ereport(ERROR,
    1475             :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1476             :                                  errmsg("password_required=false is superuser-only"),
    1477             :                                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1478             : 
    1479             :                     values[Anum_pg_subscription_subpasswordrequired - 1]
    1480          12 :                         = BoolGetDatum(opts.passwordrequired);
    1481             :                     replaces[Anum_pg_subscription_subpasswordrequired - 1]
    1482          12 :                         = true;
    1483             :                 }
    1484             : 
    1485         200 :                 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
    1486             :                 {
    1487          14 :                     values[Anum_pg_subscription_subrunasowner - 1] =
    1488          14 :                         BoolGetDatum(opts.runasowner);
    1489          14 :                     replaces[Anum_pg_subscription_subrunasowner - 1] = true;
    1490             :                 }
    1491             : 
    1492         200 :                 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
    1493             :                 {
    1494             :                     /*
    1495             :                      * We need to update both the slot and the subscription
    1496             :                      * for the two_phase option. We can enable the two_phase
    1497             :                      * option for a slot only once the initial data
    1498             :                      * synchronization is done. This is to avoid missing some
    1499             :                      * data as explained in comments atop worker.c.
    1500             :                      */
    1501           6 :                     update_two_phase = !opts.twophase;
    1502             : 
    1503           6 :                     CheckAlterSubOption(sub, "two_phase", update_two_phase,
    1504             :                                         isTopLevel);
    1505             : 
    1506             :                     /*
    1507             :                      * Modifying the two_phase slot option requires a slot
    1508             :                      * lookup by slot name, so changing the slot name at the
    1509             :                      * same time is not allowed.
    1510             :                      */
    1511           6 :                     if (update_two_phase &&
    1512           2 :                         IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1513           0 :                         ereport(ERROR,
    1514             :                                 (errcode(ERRCODE_SYNTAX_ERROR),
    1515             :                                  errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
    1516             : 
    1517             :                     /*
    1518             :                      * Note that workers may still survive even if the
    1519             :                      * subscription has been disabled.
    1520             :                      *
    1521             :                      * Ensure workers have already been exited to avoid
    1522             :                      * getting prepared transactions while we are disabling
    1523             :                      * the two_phase option. Otherwise, the changes of an
    1524             :                      * already prepared transaction can be replicated again
    1525             :                      * along with its corresponding commit, leading to
    1526             :                      * duplicate data or errors.
    1527             :                      */
    1528           6 :                     if (logicalrep_workers_find(subid, true, true))
    1529           0 :                         ereport(ERROR,
    1530             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1531             :                                  errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
    1532             :                                  errhint("Try again after some time.")));
    1533             : 
    1534             :                     /*
    1535             :                      * two_phase cannot be disabled if there are any
    1536             :                      * uncommitted prepared transactions present otherwise it
    1537             :                      * can lead to duplicate data or errors as explained in
    1538             :                      * the comment above.
    1539             :                      */
    1540           6 :                     if (update_two_phase &&
    1541           2 :                         sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
    1542           2 :                         LookupGXactBySubid(subid))
    1543           0 :                         ereport(ERROR,
    1544             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1545             :                                  errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
    1546             :                                  errhint("Resolve these transactions and try again.")));
    1547             : 
    1548             :                     /* Change system catalog accordingly */
    1549           6 :                     values[Anum_pg_subscription_subtwophasestate - 1] =
    1550           6 :                         CharGetDatum(opts.twophase ?
    1551             :                                      LOGICALREP_TWOPHASE_STATE_PENDING :
    1552             :                                      LOGICALREP_TWOPHASE_STATE_DISABLED);
    1553           6 :                     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1554             :                 }
    1555             : 
    1556         200 :                 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
    1557             :                 {
    1558             :                     /*
    1559             :                      * Similar to the two_phase case above, we need to update
    1560             :                      * the failover option for both the slot and the
    1561             :                      * subscription.
    1562             :                      */
    1563          16 :                     update_failover = true;
    1564             : 
    1565          16 :                     CheckAlterSubOption(sub, "failover", update_failover,
    1566             :                                         isTopLevel);
    1567             : 
    1568           8 :                     values[Anum_pg_subscription_subfailover - 1] =
    1569           8 :                         BoolGetDatum(opts.failover);
    1570           8 :                     replaces[Anum_pg_subscription_subfailover - 1] = true;
    1571             :                 }
    1572             : 
    1573         192 :                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
    1574             :                 {
    1575           4 :                     values[Anum_pg_subscription_subretaindeadtuples - 1] =
    1576           4 :                         BoolGetDatum(opts.retaindeadtuples);
    1577           4 :                     replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
    1578             : 
    1579             :                     /*
    1580             :                      * Update the retention status only if there's a change in
    1581             :                      * the retain_dead_tuples option value.
    1582             :                      *
    1583             :                      * Automatically marking retention as active when
    1584             :                      * retain_dead_tuples is enabled may not always be ideal,
    1585             :                      * especially if retention was previously stopped and the
    1586             :                      * user toggles retain_dead_tuples without adjusting the
    1587             :                      * publisher workload. However, this behavior provides a
    1588             :                      * convenient way for users to manually refresh the
    1589             :                      * retention status. Since retention will be stopped again
    1590             :                      * unless the publisher workload is reduced, this approach
    1591             :                      * is acceptable for now.
    1592             :                      */
    1593           4 :                     if (opts.retaindeadtuples != sub->retaindeadtuples)
    1594             :                     {
    1595           4 :                         values[Anum_pg_subscription_subretentionactive - 1] =
    1596           4 :                             BoolGetDatum(opts.retaindeadtuples);
    1597           4 :                         replaces[Anum_pg_subscription_subretentionactive - 1] = true;
    1598             : 
    1599           4 :                         retention_active = opts.retaindeadtuples;
    1600             :                     }
    1601             : 
    1602           4 :                     CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
    1603             : 
    1604             :                     /*
    1605             :                      * Workers may continue running even after the
    1606             :                      * subscription has been disabled.
    1607             :                      *
    1608             :                      * To prevent race conditions (as described in
    1609             :                      * CheckAlterSubOption()), ensure that all worker
    1610             :                      * processes have already exited before proceeding.
    1611             :                      */
    1612           2 :                     if (logicalrep_workers_find(subid, true, true))
    1613           0 :                         ereport(ERROR,
    1614             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1615             :                                  errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
    1616             :                                  errhint("Try again after some time.")));
    1617             : 
    1618             :                     /*
    1619             :                      * Notify the launcher to manage the replication slot for
    1620             :                      * conflict detection. This ensures that replication slot
    1621             :                      * is efficiently handled (created, updated, or dropped)
    1622             :                      * in response to any configuration changes.
    1623             :                      */
    1624           2 :                     ApplyLauncherWakeupAtCommit();
    1625             : 
    1626           2 :                     check_pub_rdt = opts.retaindeadtuples;
    1627           2 :                     retain_dead_tuples = opts.retaindeadtuples;
    1628             :                 }
    1629             : 
    1630         190 :                 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1631             :                 {
    1632          10 :                     values[Anum_pg_subscription_submaxretention - 1] =
    1633          10 :                         Int32GetDatum(opts.maxretention);
    1634          10 :                     replaces[Anum_pg_subscription_submaxretention - 1] = true;
    1635             : 
    1636          10 :                     max_retention = opts.maxretention;
    1637             :                 }
    1638             : 
    1639             :                 /*
    1640             :                  * Ensure that system configuration parameters are set
    1641             :                  * appropriately to support retain_dead_tuples and
    1642             :                  * max_retention_duration.
    1643             :                  */
    1644         190 :                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
    1645         188 :                     IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1646          12 :                     CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
    1647             :                                                retain_dead_tuples,
    1648             :                                                retention_active,
    1649          12 :                                                (max_retention > 0));
    1650             : 
    1651         190 :                 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
    1652             :                 {
    1653          10 :                     values[Anum_pg_subscription_suborigin - 1] =
    1654          10 :                         CStringGetTextDatum(opts.origin);
    1655          10 :                     replaces[Anum_pg_subscription_suborigin - 1] = true;
    1656             : 
    1657             :                     /*
    1658             :                      * Check if changes from different origins may be received
    1659             :                      * from the publisher when the origin is changed to ANY
    1660             :                      * and retain_dead_tuples is enabled.
    1661             :                      */
    1662          14 :                     check_pub_rdt = retain_dead_tuples &&
    1663           4 :                         pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
    1664             : 
    1665          10 :                     origin = opts.origin;
    1666             :                 }
    1667             : 
    1668         190 :                 update_tuple = true;
    1669         190 :                 break;
    1670             :             }
    1671             : 
    1672         104 :         case ALTER_SUBSCRIPTION_ENABLED:
    1673             :             {
    1674         104 :                 parse_subscription_options(pstate, stmt->options,
    1675             :                                            SUBOPT_ENABLED, &opts);
    1676             :                 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
    1677             : 
    1678         104 :                 if (!sub->slotname && opts.enabled)
    1679           6 :                     ereport(ERROR,
    1680             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1681             :                              errmsg("cannot enable subscription that does not have a slot name")));
    1682             : 
    1683             :                 /*
    1684             :                  * Check track_commit_timestamp only when enabling the
    1685             :                  * subscription in case it was disabled after creation. See
    1686             :                  * comments atop CheckSubDeadTupleRetention() for details.
    1687             :                  */
    1688          98 :                 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
    1689          98 :                                            WARNING, sub->retaindeadtuples,
    1690          98 :                                            sub->retentionactive, false);
    1691             : 
    1692          98 :                 values[Anum_pg_subscription_subenabled - 1] =
    1693          98 :                     BoolGetDatum(opts.enabled);
    1694          98 :                 replaces[Anum_pg_subscription_subenabled - 1] = true;
    1695             : 
    1696          98 :                 if (opts.enabled)
    1697          54 :                     ApplyLauncherWakeupAtCommit();
    1698             : 
    1699          98 :                 update_tuple = true;
    1700             : 
    1701             :                 /*
    1702             :                  * The subscription might be initially created with
    1703             :                  * connect=false and retain_dead_tuples=true, meaning the
    1704             :                  * remote server's status may not be checked. Ensure this
    1705             :                  * check is conducted now.
    1706             :                  */
    1707          98 :                 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
    1708          98 :                 break;
    1709             :             }
    1710             : 
    1711          20 :         case ALTER_SUBSCRIPTION_CONNECTION:
    1712             :             /* Load the library providing us libpq calls. */
    1713          20 :             load_file("libpqwalreceiver", false);
    1714             :             /* Check the connection info string. */
    1715          20 :             walrcv_check_conninfo(stmt->conninfo,
    1716             :                                   sub->passwordrequired && !sub->ownersuperuser);
    1717             : 
    1718          14 :             values[Anum_pg_subscription_subconninfo - 1] =
    1719          14 :                 CStringGetTextDatum(stmt->conninfo);
    1720          14 :             replaces[Anum_pg_subscription_subconninfo - 1] = true;
    1721          14 :             update_tuple = true;
    1722             : 
    1723             :             /*
    1724             :              * Since the remote server configuration might have changed,
    1725             :              * perform a check to ensure it permits enabling
    1726             :              * retain_dead_tuples.
    1727             :              */
    1728          14 :             check_pub_rdt = sub->retaindeadtuples;
    1729          14 :             break;
    1730             : 
    1731          32 :         case ALTER_SUBSCRIPTION_SET_PUBLICATION:
    1732             :             {
    1733          32 :                 supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
    1734          32 :                 parse_subscription_options(pstate, stmt->options,
    1735             :                                            supported_opts, &opts);
    1736             : 
    1737          32 :                 values[Anum_pg_subscription_subpublications - 1] =
    1738          32 :                     publicationListToArray(stmt->publication);
    1739          32 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1740             : 
    1741          32 :                 update_tuple = true;
    1742             : 
    1743             :                 /* Refresh if user asked us to. */
    1744          32 :                 if (opts.refresh)
    1745             :                 {
    1746          26 :                     if (!sub->enabled)
    1747           0 :                         ereport(ERROR,
    1748             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1749             :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1750             :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
    1751             : 
    1752             :                     /*
    1753             :                      * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    1754             :                      * why this is not allowed.
    1755             :                      */
    1756          26 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1757           0 :                         ereport(ERROR,
    1758             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1759             :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1760             :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    1761             : 
    1762          26 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1763             : 
    1764             :                     /* Make sure refresh sees the new list of publications. */
    1765          14 :                     sub->publications = stmt->publication;
    1766             : 
    1767          14 :                     AlterSubscription_refresh(sub, opts.copy_data,
    1768             :                                               stmt->publication);
    1769             :                 }
    1770             : 
    1771          20 :                 break;
    1772             :             }
    1773             : 
    1774          54 :         case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
    1775             :         case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
    1776             :             {
    1777             :                 List       *publist;
    1778          54 :                 bool        isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
    1779             : 
    1780          54 :                 supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
    1781          54 :                 parse_subscription_options(pstate, stmt->options,
    1782             :                                            supported_opts, &opts);
    1783             : 
    1784          54 :                 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
    1785          18 :                 values[Anum_pg_subscription_subpublications - 1] =
    1786          18 :                     publicationListToArray(publist);
    1787          18 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1788             : 
    1789          18 :                 update_tuple = true;
    1790             : 
    1791             :                 /* Refresh if user asked us to. */
    1792          18 :                 if (opts.refresh)
    1793             :                 {
    1794             :                     /* We only need to validate user specified publications. */
    1795           6 :                     List       *validate_publications = (isadd) ? stmt->publication : NULL;
    1796             : 
    1797           6 :                     if (!sub->enabled)
    1798           0 :                         ereport(ERROR,
    1799             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1800             :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1801             :                         /* translator: %s is an SQL ALTER command */
    1802             :                                  errhint("Use %s instead.",
    1803             :                                          isadd ?
    1804             :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
    1805             :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
    1806             : 
    1807             :                     /*
    1808             :                      * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    1809             :                      * why this is not allowed.
    1810             :                      */
    1811           6 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1812           0 :                         ereport(ERROR,
    1813             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1814             :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1815             :                         /* translator: %s is an SQL ALTER command */
    1816             :                                  errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
    1817             :                                          isadd ?
    1818             :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
    1819             :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
    1820             : 
    1821           6 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1822             : 
    1823             :                     /* Refresh the new list of publications. */
    1824           6 :                     sub->publications = publist;
    1825             : 
    1826           6 :                     AlterSubscription_refresh(sub, opts.copy_data,
    1827             :                                               validate_publications);
    1828             :                 }
    1829             : 
    1830          18 :                 break;
    1831             :             }
    1832             : 
    1833          68 :         case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
    1834             :             {
    1835          68 :                 if (!sub->enabled)
    1836           6 :                     ereport(ERROR,
    1837             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1838             :                              errmsg("%s is not allowed for disabled subscriptions",
    1839             :                                     "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
    1840             : 
    1841          62 :                 parse_subscription_options(pstate, stmt->options,
    1842             :                                            SUBOPT_COPY_DATA, &opts);
    1843             : 
    1844             :                 /*
    1845             :                  * The subscription option "two_phase" requires that
    1846             :                  * replication has passed the initial table synchronization
    1847             :                  * phase before the two_phase becomes properly enabled.
    1848             :                  *
    1849             :                  * But, having reached this two-phase commit "enabled" state
    1850             :                  * we must not allow any subsequent table initialization to
    1851             :                  * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
    1852             :                  * disallowed when the user had requested two_phase = on mode.
    1853             :                  *
    1854             :                  * The exception to this restriction is when copy_data =
    1855             :                  * false, because when copy_data is false the tablesync will
    1856             :                  * start already in READY state and will exit directly without
    1857             :                  * doing anything.
    1858             :                  *
    1859             :                  * For more details see comments atop worker.c.
    1860             :                  */
    1861          62 :                 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1862           0 :                     ereport(ERROR,
    1863             :                             (errcode(ERRCODE_SYNTAX_ERROR),
    1864             :                              errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
    1865             :                              errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    1866             : 
    1867          62 :                 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
    1868             : 
    1869          56 :                 AlterSubscription_refresh(sub, opts.copy_data, NULL);
    1870             : 
    1871          54 :                 break;
    1872             :             }
    1873             : 
    1874           2 :         case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
    1875             :             {
    1876           2 :                 if (!sub->enabled)
    1877           0 :                     ereport(ERROR,
    1878             :                             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1879             :                             errmsg("%s is not allowed for disabled subscriptions",
    1880             :                                    "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
    1881             : 
    1882           2 :                 AlterSubscription_refresh_seq(sub);
    1883             : 
    1884           2 :                 break;
    1885             :             }
    1886             : 
    1887          24 :         case ALTER_SUBSCRIPTION_SKIP:
    1888             :             {
    1889          24 :                 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
    1890             : 
    1891             :                 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
    1892             :                 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
    1893             : 
    1894             :                 /*
    1895             :                  * If the user sets subskiplsn, we do a sanity check to make
    1896             :                  * sure that the specified LSN is a probable value.
    1897             :                  */
    1898          18 :                 if (XLogRecPtrIsValid(opts.lsn))
    1899             :                 {
    1900             :                     RepOriginId originid;
    1901             :                     char        originname[NAMEDATALEN];
    1902             :                     XLogRecPtr  remote_lsn;
    1903             : 
    1904          12 :                     ReplicationOriginNameForLogicalRep(subid, InvalidOid,
    1905             :                                                        originname, sizeof(originname));
    1906          12 :                     originid = replorigin_by_name(originname, false);
    1907          12 :                     remote_lsn = replorigin_get_progress(originid, false);
    1908             : 
    1909             :                     /* Check the given LSN is at least a future LSN */
    1910          12 :                     if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
    1911           0 :                         ereport(ERROR,
    1912             :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1913             :                                  errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
    1914             :                                         LSN_FORMAT_ARGS(opts.lsn),
    1915             :                                         LSN_FORMAT_ARGS(remote_lsn))));
    1916             :                 }
    1917             : 
    1918          18 :                 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
    1919          18 :                 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
    1920             : 
    1921          18 :                 update_tuple = true;
    1922          18 :                 break;
    1923             :             }
    1924             : 
    1925           0 :         default:
    1926           0 :             elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
    1927             :                  stmt->kind);
    1928             :     }
    1929             : 
    1930             :     /* Update the catalog if needed. */
    1931         414 :     if (update_tuple)
    1932             :     {
    1933         358 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1934             :                                 replaces);
    1935             : 
    1936         358 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    1937             : 
    1938         358 :         heap_freetuple(tup);
    1939             :     }
    1940             : 
    1941             :     /*
    1942             :      * Try to acquire the connection necessary either for modifying the slot
    1943             :      * or for checking if the remote server permits enabling
    1944             :      * retain_dead_tuples.
    1945             :      *
    1946             :      * This has to be at the end because otherwise if there is an error while
    1947             :      * doing the database operations we won't be able to rollback altered
    1948             :      * slot.
    1949             :      */
    1950         414 :     if (update_failover || update_two_phase || check_pub_rdt)
    1951             :     {
    1952             :         bool        must_use_password;
    1953             :         char       *err;
    1954             :         WalReceiverConn *wrconn;
    1955             : 
    1956             :         /* Load the library providing us libpq calls. */
    1957          26 :         load_file("libpqwalreceiver", false);
    1958             : 
    1959             :         /*
    1960             :          * Try to connect to the publisher, using the new connection string if
    1961             :          * available.
    1962             :          */
    1963          26 :         must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1964          26 :         wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
    1965             :                                 true, true, must_use_password, sub->name,
    1966             :                                 &err);
    1967          26 :         if (!wrconn)
    1968           0 :             ereport(ERROR,
    1969             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    1970             :                      errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1971             :                             sub->name, err)));
    1972             : 
    1973          26 :         PG_TRY();
    1974             :         {
    1975          26 :             if (retain_dead_tuples)
    1976          18 :                 check_pub_dead_tuple_retention(wrconn);
    1977             : 
    1978          26 :             check_publications_origin_tables(wrconn, sub->publications, false,
    1979             :                                              retain_dead_tuples, origin, NULL, 0,
    1980             :                                              sub->name);
    1981             : 
    1982          26 :             if (update_failover || update_two_phase)
    1983          10 :                 walrcv_alter_slot(wrconn, sub->slotname,
    1984             :                                   update_failover ? &opts.failover : NULL,
    1985             :                                   update_two_phase ? &opts.twophase : NULL);
    1986             :         }
    1987           0 :         PG_FINALLY();
    1988             :         {
    1989          26 :             walrcv_disconnect(wrconn);
    1990             :         }
    1991          26 :         PG_END_TRY();
    1992             :     }
    1993             : 
    1994         414 :     table_close(rel, RowExclusiveLock);
    1995             : 
    1996         414 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    1997             : 
    1998         414 :     InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
    1999             : 
    2000             :     /* Wake up related replication workers to handle this change quickly. */
    2001         414 :     LogicalRepWorkersWakeupAtCommit(subid);
    2002             : 
    2003         414 :     return myself;
    2004             : }
    2005             : 
    2006             : /*
    2007             :  * Drop a subscription
    2008             :  */
    2009             : void
    2010         248 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    2011             : {
    2012             :     Relation    rel;
    2013             :     ObjectAddress myself;
    2014             :     HeapTuple   tup;
    2015             :     Oid         subid;
    2016             :     Oid         subowner;
    2017             :     Datum       datum;
    2018             :     bool        isnull;
    2019             :     char       *subname;
    2020             :     char       *conninfo;
    2021             :     char       *slotname;
    2022             :     List       *subworkers;
    2023             :     ListCell   *lc;
    2024             :     char        originname[NAMEDATALEN];
    2025         248 :     char       *err = NULL;
    2026             :     WalReceiverConn *wrconn;
    2027             :     Form_pg_subscription form;
    2028             :     List       *rstates;
    2029             :     bool        must_use_password;
    2030             : 
    2031             :     /*
    2032             :      * The launcher may concurrently start a new worker for this subscription.
    2033             :      * During initialization, the worker checks for subscription validity and
    2034             :      * exits if the subscription has already been dropped. See
    2035             :      * InitializeLogRepWorker.
    2036             :      */
    2037         248 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2038             : 
    2039         248 :     tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2040         248 :                           CStringGetDatum(stmt->subname));
    2041             : 
    2042         248 :     if (!HeapTupleIsValid(tup))
    2043             :     {
    2044          12 :         table_close(rel, NoLock);
    2045             : 
    2046          12 :         if (!stmt->missing_ok)
    2047           6 :             ereport(ERROR,
    2048             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2049             :                      errmsg("subscription \"%s\" does not exist",
    2050             :                             stmt->subname)));
    2051             :         else
    2052           6 :             ereport(NOTICE,
    2053             :                     (errmsg("subscription \"%s\" does not exist, skipping",
    2054             :                             stmt->subname)));
    2055             : 
    2056          90 :         return;
    2057             :     }
    2058             : 
    2059         236 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2060         236 :     subid = form->oid;
    2061         236 :     subowner = form->subowner;
    2062         236 :     must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
    2063             : 
    2064             :     /* must be owner */
    2065         236 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    2066           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2067           0 :                        stmt->subname);
    2068             : 
    2069             :     /* DROP hook for the subscription being removed */
    2070         236 :     InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
    2071             : 
    2072             :     /*
    2073             :      * Lock the subscription so nobody else can do anything with it (including
    2074             :      * the replication workers).
    2075             :      */
    2076         236 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    2077             : 
    2078             :     /* Get subname */
    2079         236 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2080             :                                    Anum_pg_subscription_subname);
    2081         236 :     subname = pstrdup(NameStr(*DatumGetName(datum)));
    2082             : 
    2083             :     /* Get conninfo */
    2084         236 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2085             :                                    Anum_pg_subscription_subconninfo);
    2086         236 :     conninfo = TextDatumGetCString(datum);
    2087             : 
    2088             :     /* Get slotname */
    2089         236 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
    2090             :                             Anum_pg_subscription_subslotname, &isnull);
    2091         236 :     if (!isnull)
    2092         150 :         slotname = pstrdup(NameStr(*DatumGetName(datum)));
    2093             :     else
    2094          86 :         slotname = NULL;
    2095             : 
    2096             :     /*
    2097             :      * Since dropping a replication slot is not transactional, the replication
    2098             :      * slot stays dropped even if the transaction rolls back.  So we cannot
    2099             :      * run DROP SUBSCRIPTION inside a transaction block if dropping the
    2100             :      * replication slot.  Also, in this case, we report a message for dropping
    2101             :      * the subscription to the cumulative stats system.
    2102             :      *
    2103             :      * XXX The command name should really be something like "DROP SUBSCRIPTION
    2104             :      * of a subscription that is associated with a replication slot", but we
    2105             :      * don't have the proper facilities for that.
    2106             :      */
    2107         236 :     if (slotname)
    2108         150 :         PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
    2109             : 
    2110         230 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    2111         230 :     EventTriggerSQLDropAddObject(&myself, true, true);
    2112             : 
    2113             :     /* Remove the tuple from catalog. */
    2114         230 :     CatalogTupleDelete(rel, &tup->t_self);
    2115             : 
    2116         230 :     ReleaseSysCache(tup);
    2117             : 
    2118             :     /*
    2119             :      * Stop all the subscription workers immediately.
    2120             :      *
    2121             :      * This is necessary if we are dropping the replication slot, so that the
    2122             :      * slot becomes accessible.
    2123             :      *
    2124             :      * It is also necessary if the subscription is disabled and was disabled
    2125             :      * in the same transaction.  Then the workers haven't seen the disabling
    2126             :      * yet and will still be running, leading to hangs later when we want to
    2127             :      * drop the replication origin.  If the subscription was disabled before
    2128             :      * this transaction, then there shouldn't be any workers left, so this
    2129             :      * won't make a difference.
    2130             :      *
    2131             :      * New workers won't be started because we hold an exclusive lock on the
    2132             :      * subscription till the end of the transaction.
    2133             :      */
    2134         230 :     subworkers = logicalrep_workers_find(subid, false, true);
    2135         378 :     foreach(lc, subworkers)
    2136             :     {
    2137         148 :         LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
    2138             : 
    2139         148 :         logicalrep_worker_stop(w->type, w->subid, w->relid);
    2140             :     }
    2141         230 :     list_free(subworkers);
    2142             : 
    2143             :     /*
    2144             :      * Remove the no-longer-useful entry in the launcher's table of apply
    2145             :      * worker start times.
    2146             :      *
    2147             :      * If this transaction rolls back, the launcher might restart a failed
    2148             :      * apply worker before wal_retrieve_retry_interval milliseconds have
    2149             :      * elapsed, but that's pretty harmless.
    2150             :      */
    2151         230 :     ApplyLauncherForgetWorkerStartTime(subid);
    2152             : 
    2153             :     /*
    2154             :      * Cleanup of tablesync replication origins.
    2155             :      *
    2156             :      * Any READY-state relations would already have dealt with clean-ups.
    2157             :      *
    2158             :      * Note that the state can't change because we have already stopped both
    2159             :      * the apply and tablesync workers and they can't restart because of
    2160             :      * exclusive lock on the subscription.
    2161             :      */
    2162         230 :     rstates = GetSubscriptionRelations(subid, true, false, true);
    2163         242 :     foreach(lc, rstates)
    2164             :     {
    2165          12 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2166          12 :         Oid         relid = rstate->relid;
    2167             : 
    2168             :         /* Only cleanup resources of tablesync workers */
    2169          12 :         if (!OidIsValid(relid))
    2170           0 :             continue;
    2171             : 
    2172             :         /*
    2173             :          * Drop the tablesync's origin tracking if exists.
    2174             :          *
    2175             :          * It is possible that the origin is not yet created for tablesync
    2176             :          * worker so passing missing_ok = true. This can happen for the states
    2177             :          * before SUBREL_STATE_FINISHEDCOPY.
    2178             :          */
    2179          12 :         ReplicationOriginNameForLogicalRep(subid, relid, originname,
    2180             :                                            sizeof(originname));
    2181          12 :         replorigin_drop_by_name(originname, true, false);
    2182             :     }
    2183             : 
    2184             :     /* Clean up dependencies */
    2185         230 :     deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
    2186             : 
    2187             :     /* Remove any associated relation synchronization states. */
    2188         230 :     RemoveSubscriptionRel(subid, InvalidOid);
    2189             : 
    2190             :     /* Remove the origin tracking if exists. */
    2191         230 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
    2192         230 :     replorigin_drop_by_name(originname, true, false);
    2193             : 
    2194             :     /*
    2195             :      * Tell the cumulative stats system that the subscription is getting
    2196             :      * dropped.
    2197             :      */
    2198         230 :     pgstat_drop_subscription(subid);
    2199             : 
    2200             :     /*
    2201             :      * If there is no slot associated with the subscription, we can finish
    2202             :      * here.
    2203             :      */
    2204         230 :     if (!slotname && rstates == NIL)
    2205             :     {
    2206          84 :         table_close(rel, NoLock);
    2207          84 :         return;
    2208             :     }
    2209             : 
    2210             :     /*
    2211             :      * Try to acquire the connection necessary for dropping slots.
    2212             :      *
    2213             :      * Note: If the slotname is NONE/NULL then we allow the command to finish
    2214             :      * and users need to manually cleanup the apply and tablesync worker slots
    2215             :      * later.
    2216             :      *
    2217             :      * This has to be at the end because otherwise if there is an error while
    2218             :      * doing the database operations we won't be able to rollback dropped
    2219             :      * slot.
    2220             :      */
    2221         146 :     load_file("libpqwalreceiver", false);
    2222             : 
    2223         146 :     wrconn = walrcv_connect(conninfo, true, true, must_use_password,
    2224             :                             subname, &err);
    2225         146 :     if (wrconn == NULL)
    2226             :     {
    2227           0 :         if (!slotname)
    2228             :         {
    2229             :             /* be tidy */
    2230           0 :             list_free(rstates);
    2231           0 :             table_close(rel, NoLock);
    2232           0 :             return;
    2233             :         }
    2234             :         else
    2235             :         {
    2236           0 :             ReportSlotConnectionError(rstates, subid, slotname, err);
    2237             :         }
    2238             :     }
    2239             : 
    2240         146 :     PG_TRY();
    2241             :     {
    2242         158 :         foreach(lc, rstates)
    2243             :         {
    2244          12 :             SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2245          12 :             Oid         relid = rstate->relid;
    2246             : 
    2247             :             /* Only cleanup resources of tablesync workers */
    2248          12 :             if (!OidIsValid(relid))
    2249           0 :                 continue;
    2250             : 
    2251             :             /*
    2252             :              * Drop the tablesync slots associated with removed tables.
    2253             :              *
    2254             :              * For SYNCDONE/READY states, the tablesync slot is known to have
    2255             :              * already been dropped by the tablesync worker.
    2256             :              *
    2257             :              * For other states, there is no certainty, maybe the slot does
    2258             :              * not exist yet. Also, if we fail after removing some of the
    2259             :              * slots, next time, it will again try to drop already dropped
    2260             :              * slots and fail. For these reasons, we allow missing_ok = true
    2261             :              * for the drop.
    2262             :              */
    2263          12 :             if (rstate->state != SUBREL_STATE_SYNCDONE)
    2264             :             {
    2265          10 :                 char        syncslotname[NAMEDATALEN] = {0};
    2266             : 
    2267          10 :                 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    2268             :                                                 sizeof(syncslotname));
    2269          10 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    2270             :             }
    2271             :         }
    2272             : 
    2273         146 :         list_free(rstates);
    2274             : 
    2275             :         /*
    2276             :          * If there is a slot associated with the subscription, then drop the
    2277             :          * replication slot at the publisher.
    2278             :          */
    2279         146 :         if (slotname)
    2280         144 :             ReplicationSlotDropAtPubNode(wrconn, slotname, false);
    2281             :     }
    2282           2 :     PG_FINALLY();
    2283             :     {
    2284         146 :         walrcv_disconnect(wrconn);
    2285             :     }
    2286         146 :     PG_END_TRY();
    2287             : 
    2288         144 :     table_close(rel, NoLock);
    2289             : }
    2290             : 
    2291             : /*
    2292             :  * Drop the replication slot at the publisher node using the replication
    2293             :  * connection.
    2294             :  *
    2295             :  * missing_ok - if true then only issue a LOG message if the slot doesn't
    2296             :  * exist.
    2297             :  */
    2298             : void
    2299         540 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
    2300             : {
    2301             :     StringInfoData cmd;
    2302             : 
    2303             :     Assert(wrconn);
    2304             : 
    2305         540 :     load_file("libpqwalreceiver", false);
    2306             : 
    2307         540 :     initStringInfo(&cmd);
    2308         540 :     appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
    2309             : 
    2310         540 :     PG_TRY();
    2311             :     {
    2312             :         WalRcvExecResult *res;
    2313             : 
    2314         540 :         res = walrcv_exec(wrconn, cmd.data, 0, NULL);
    2315             : 
    2316         540 :         if (res->status == WALRCV_OK_COMMAND)
    2317             :         {
    2318             :             /* NOTICE. Success. */
    2319         534 :             ereport(NOTICE,
    2320             :                     (errmsg("dropped replication slot \"%s\" on publisher",
    2321             :                             slotname)));
    2322             :         }
    2323           6 :         else if (res->status == WALRCV_ERROR &&
    2324           4 :                  missing_ok &&
    2325           4 :                  res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
    2326             :         {
    2327             :             /* LOG. Error, but missing_ok = true. */
    2328           4 :             ereport(LOG,
    2329             :                     (errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2330             :                             slotname, res->err)));
    2331             :         }
    2332             :         else
    2333             :         {
    2334             :             /* ERROR. */
    2335           2 :             ereport(ERROR,
    2336             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    2337             :                      errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2338             :                             slotname, res->err)));
    2339             :         }
    2340             : 
    2341         538 :         walrcv_clear_result(res);
    2342             :     }
    2343           2 :     PG_FINALLY();
    2344             :     {
    2345         540 :         pfree(cmd.data);
    2346             :     }
    2347         540 :     PG_END_TRY();
    2348         538 : }
    2349             : 
    2350             : /*
    2351             :  * Internal workhorse for changing a subscription owner
    2352             :  */
    2353             : static void
    2354          18 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2355             : {
    2356             :     Form_pg_subscription form;
    2357             :     AclResult   aclresult;
    2358             : 
    2359          18 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2360             : 
    2361          18 :     if (form->subowner == newOwnerId)
    2362           4 :         return;
    2363             : 
    2364          14 :     if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
    2365           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2366           0 :                        NameStr(form->subname));
    2367             : 
    2368             :     /*
    2369             :      * Don't allow non-superuser modification of a subscription with
    2370             :      * password_required=false.
    2371             :      */
    2372          14 :     if (!form->subpasswordrequired && !superuser())
    2373           0 :         ereport(ERROR,
    2374             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2375             :                  errmsg("password_required=false is superuser-only"),
    2376             :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    2377             : 
    2378             :     /* Must be able to become new owner */
    2379          14 :     check_can_set_role(GetUserId(), newOwnerId);
    2380             : 
    2381             :     /*
    2382             :      * current owner must have CREATE on database
    2383             :      *
    2384             :      * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
    2385             :      * other object types behave differently (e.g. you can't give a table to a
    2386             :      * user who lacks CREATE privileges on a schema).
    2387             :      */
    2388           8 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
    2389             :                                 GetUserId(), ACL_CREATE);
    2390           8 :     if (aclresult != ACLCHECK_OK)
    2391           0 :         aclcheck_error(aclresult, OBJECT_DATABASE,
    2392           0 :                        get_database_name(MyDatabaseId));
    2393             : 
    2394           8 :     form->subowner = newOwnerId;
    2395           8 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2396             : 
    2397             :     /* Update owner dependency reference */
    2398           8 :     changeDependencyOnOwner(SubscriptionRelationId,
    2399             :                             form->oid,
    2400             :                             newOwnerId);
    2401             : 
    2402           8 :     InvokeObjectPostAlterHook(SubscriptionRelationId,
    2403             :                               form->oid, 0);
    2404             : 
    2405             :     /* Wake up related background processes to handle this change quickly. */
    2406           8 :     ApplyLauncherWakeupAtCommit();
    2407           8 :     LogicalRepWorkersWakeupAtCommit(form->oid);
    2408             : }
    2409             : 
    2410             : /*
    2411             :  * Change subscription owner -- by name
    2412             :  */
    2413             : ObjectAddress
    2414          18 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
    2415             : {
    2416             :     Oid         subid;
    2417             :     HeapTuple   tup;
    2418             :     Relation    rel;
    2419             :     ObjectAddress address;
    2420             :     Form_pg_subscription form;
    2421             : 
    2422          18 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2423             : 
    2424          18 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2425             :                               CStringGetDatum(name));
    2426             : 
    2427          18 :     if (!HeapTupleIsValid(tup))
    2428           0 :         ereport(ERROR,
    2429             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2430             :                  errmsg("subscription \"%s\" does not exist", name)));
    2431             : 
    2432          18 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2433          18 :     subid = form->oid;
    2434             : 
    2435          18 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2436             : 
    2437          12 :     ObjectAddressSet(address, SubscriptionRelationId, subid);
    2438             : 
    2439          12 :     heap_freetuple(tup);
    2440             : 
    2441          12 :     table_close(rel, RowExclusiveLock);
    2442             : 
    2443          12 :     return address;
    2444             : }
    2445             : 
    2446             : /*
    2447             :  * Change subscription owner -- by OID
    2448             :  */
    2449             : void
    2450           0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
    2451             : {
    2452             :     HeapTuple   tup;
    2453             :     Relation    rel;
    2454             : 
    2455           0 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2456             : 
    2457           0 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
    2458             : 
    2459           0 :     if (!HeapTupleIsValid(tup))
    2460           0 :         ereport(ERROR,
    2461             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2462             :                  errmsg("subscription with OID %u does not exist", subid)));
    2463             : 
    2464           0 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2465             : 
    2466           0 :     heap_freetuple(tup);
    2467             : 
    2468           0 :     table_close(rel, RowExclusiveLock);
    2469           0 : }
    2470             : 
    2471             : /*
    2472             :  * Check and log a warning if the publisher has subscribed to the same table,
    2473             :  * its partition ancestors (if it's a partition), or its partition children (if
    2474             :  * it's a partitioned table), from some other publishers. This check is
    2475             :  * required in the following scenarios:
    2476             :  *
    2477             :  * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    2478             :  *    statements with "copy_data = true" and "origin = none":
    2479             :  *    - Warn the user that data with an origin might have been copied.
    2480             :  *    - This check is skipped for tables already added, as incremental sync via
    2481             :  *      WAL allows origin tracking. The list of such tables is in
    2482             :  *      subrel_local_oids.
    2483             :  *
    2484             :  * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    2485             :  *    statements with "retain_dead_tuples = true" and "origin = any", and for
    2486             :  *    ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
    2487             :  *    or when the publisher's status changes (e.g., due to a connection string
    2488             :  *    update):
    2489             :  *    - Warn the user that only conflict detection info for local changes on
    2490             :  *      the publisher is retained. Data from other origins may lack sufficient
    2491             :  *      details for reliable conflict detection.
    2492             :  *    - See comments atop worker.c for more details.
    2493             :  */
    2494             : static void
    2495         338 : check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
    2496             :                                  bool copydata, bool retain_dead_tuples,
    2497             :                                  char *origin, Oid *subrel_local_oids,
    2498             :                                  int subrel_count, char *subname)
    2499             : {
    2500             :     WalRcvExecResult *res;
    2501             :     StringInfoData cmd;
    2502             :     TupleTableSlot *slot;
    2503         338 :     Oid         tableRow[1] = {TEXTOID};
    2504         338 :     List       *publist = NIL;
    2505             :     int         i;
    2506             :     bool        check_rdt;
    2507             :     bool        check_table_sync;
    2508         676 :     bool        origin_none = origin &&
    2509         338 :         pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
    2510             : 
    2511             :     /*
    2512             :      * Enable retain_dead_tuples checks only when origin is set to 'any',
    2513             :      * since with origin='none' only local changes are replicated to the
    2514             :      * subscriber.
    2515             :      */
    2516         338 :     check_rdt = retain_dead_tuples && !origin_none;
    2517             : 
    2518             :     /*
    2519             :      * Enable table synchronization checks only when origin is 'none', to
    2520             :      * ensure that data from other origins is not inadvertently copied.
    2521             :      */
    2522         338 :     check_table_sync = copydata && origin_none;
    2523             : 
    2524             :     /* retain_dead_tuples and table sync checks occur separately */
    2525             :     Assert(!(check_rdt && check_table_sync));
    2526             : 
    2527             :     /* Return if no checks are required */
    2528         338 :     if (!check_rdt && !check_table_sync)
    2529         310 :         return;
    2530             : 
    2531          28 :     initStringInfo(&cmd);
    2532          28 :     appendStringInfoString(&cmd,
    2533             :                            "SELECT DISTINCT P.pubname AS pubname\n"
    2534             :                            "FROM pg_publication P,\n"
    2535             :                            "     LATERAL pg_get_publication_tables(P.pubname) GPT\n"
    2536             :                            "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
    2537             :                            "     GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
    2538             :                            "                   SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
    2539             :                            "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    2540             :                            "WHERE C.oid = GPT.relid AND P.pubname IN (");
    2541          28 :     GetPublicationsStr(publications, &cmd, true);
    2542          28 :     appendStringInfoString(&cmd, ")\n");
    2543             : 
    2544             :     /*
    2545             :      * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    2546             :      * subrel_local_oids contains the list of relation oids that are already
    2547             :      * present on the subscriber. This check should be skipped for these
    2548             :      * tables if checking for table sync scenario. However, when handling the
    2549             :      * retain_dead_tuples scenario, ensure all tables are checked, as some
    2550             :      * existing tables may now include changes from other origins due to newly
    2551             :      * created subscriptions on the publisher.
    2552             :      */
    2553          28 :     if (check_table_sync)
    2554             :     {
    2555          28 :         for (i = 0; i < subrel_count; i++)
    2556             :         {
    2557           8 :             Oid         relid = subrel_local_oids[i];
    2558           8 :             char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    2559           8 :             char       *tablename = get_rel_name(relid);
    2560             : 
    2561           8 :             appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
    2562             :                              schemaname, tablename);
    2563             :         }
    2564             :     }
    2565             : 
    2566          28 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    2567          28 :     pfree(cmd.data);
    2568             : 
    2569          28 :     if (res->status != WALRCV_OK_TUPLES)
    2570           0 :         ereport(ERROR,
    2571             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2572             :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    2573             :                         res->err)));
    2574             : 
    2575             :     /* Process publications. */
    2576          28 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2577          38 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2578             :     {
    2579             :         char       *pubname;
    2580             :         bool        isnull;
    2581             : 
    2582          10 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2583             :         Assert(!isnull);
    2584             : 
    2585          10 :         ExecClearTuple(slot);
    2586          10 :         publist = list_append_unique(publist, makeString(pubname));
    2587             :     }
    2588             : 
    2589             :     /*
    2590             :      * Log a warning if the publisher has subscribed to the same table from
    2591             :      * some other publisher. We cannot know the origin of data during the
    2592             :      * initial sync. Data origins can be found only from the WAL by looking at
    2593             :      * the origin id.
    2594             :      *
    2595             :      * XXX: For simplicity, we don't check whether the table has any data or
    2596             :      * not. If the table doesn't have any data then we don't need to
    2597             :      * distinguish between data having origin and data not having origin so we
    2598             :      * can avoid logging a warning for table sync scenario.
    2599             :      */
    2600          28 :     if (publist)
    2601             :     {
    2602             :         StringInfoData pubnames;
    2603             : 
    2604             :         /* Prepare the list of publication(s) for warning message. */
    2605          10 :         initStringInfo(&pubnames);
    2606          10 :         GetPublicationsStr(publist, &pubnames, false);
    2607             : 
    2608          10 :         if (check_table_sync)
    2609           8 :             ereport(WARNING,
    2610             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2611             :                     errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    2612             :                            subname),
    2613             :                     errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    2614             :                                      "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    2615             :                                      list_length(publist), pubnames.data),
    2616             :                     errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
    2617             :         else
    2618           2 :             ereport(WARNING,
    2619             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2620             :                     errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
    2621             :                            subname),
    2622             :                     errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    2623             :                                      "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    2624             :                                      list_length(publist), pubnames.data),
    2625             :                     errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
    2626             :     }
    2627             : 
    2628          28 :     ExecDropSingleTupleTableSlot(slot);
    2629             : 
    2630          28 :     walrcv_clear_result(res);
    2631             : }
    2632             : 
    2633             : /*
    2634             :  * This function is similar to check_publications_origin_tables and serves
    2635             :  * same purpose for sequences.
    2636             :  */
    2637             : static void
    2638         314 : check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
    2639             :                                     bool copydata, char *origin,
    2640             :                                     Oid *subrel_local_oids, int subrel_count,
    2641             :                                     char *subname)
    2642             : {
    2643             :     WalRcvExecResult *res;
    2644             :     StringInfoData cmd;
    2645             :     TupleTableSlot *slot;
    2646         314 :     Oid         tableRow[1] = {TEXTOID};
    2647         314 :     List       *publist = NIL;
    2648             : 
    2649             :     /*
    2650             :      * Enable sequence synchronization checks only when origin is 'none' , to
    2651             :      * ensure that sequence data from other origins is not inadvertently
    2652             :      * copied.
    2653             :      */
    2654         314 :     if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
    2655         294 :         return;
    2656             : 
    2657          20 :     initStringInfo(&cmd);
    2658          20 :     appendStringInfoString(&cmd,
    2659             :                            "SELECT DISTINCT P.pubname AS pubname\n"
    2660             :                            "FROM pg_publication P,\n"
    2661             :                            "     LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
    2662             :                            "     JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
    2663             :                            "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    2664             :                            "WHERE C.oid = GPS.relid AND P.pubname IN (");
    2665             : 
    2666          20 :     GetPublicationsStr(publications, &cmd, true);
    2667          20 :     appendStringInfoString(&cmd, ")\n");
    2668             : 
    2669             :     /*
    2670             :      * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    2671             :      * subrel_local_oids contains the list of relations that are already
    2672             :      * present on the subscriber. This check should be skipped as these will
    2673             :      * not be re-synced.
    2674             :      */
    2675          20 :     for (int i = 0; i < subrel_count; i++)
    2676             :     {
    2677           0 :         Oid         relid = subrel_local_oids[i];
    2678           0 :         char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    2679           0 :         char       *seqname = get_rel_name(relid);
    2680             : 
    2681           0 :         appendStringInfo(&cmd,
    2682             :                          "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
    2683             :                          schemaname, seqname);
    2684             :     }
    2685             : 
    2686          20 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    2687          20 :     pfree(cmd.data);
    2688             : 
    2689          20 :     if (res->status != WALRCV_OK_TUPLES)
    2690           0 :         ereport(ERROR,
    2691             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2692             :                  errmsg("could not receive list of replicated sequences from the publisher: %s",
    2693             :                         res->err)));
    2694             : 
    2695             :     /* Process publications. */
    2696          20 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2697          20 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2698             :     {
    2699             :         char       *pubname;
    2700             :         bool        isnull;
    2701             : 
    2702           0 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2703             :         Assert(!isnull);
    2704             : 
    2705           0 :         ExecClearTuple(slot);
    2706           0 :         publist = list_append_unique(publist, makeString(pubname));
    2707             :     }
    2708             : 
    2709             :     /*
    2710             :      * Log a warning if the publisher has subscribed to the same sequence from
    2711             :      * some other publisher. We cannot know the origin of sequences data
    2712             :      * during the initial sync.
    2713             :      */
    2714          20 :     if (publist)
    2715             :     {
    2716             :         StringInfoData pubnames;
    2717             : 
    2718             :         /* Prepare the list of publication(s) for warning message. */
    2719           0 :         initStringInfo(&pubnames);
    2720           0 :         GetPublicationsStr(publist, &pubnames, false);
    2721             : 
    2722           0 :         ereport(WARNING,
    2723             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2724             :                 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    2725             :                        subname),
    2726             :                 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
    2727             :                                  "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
    2728             :                                  list_length(publist), pubnames.data),
    2729             :                 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
    2730             :     }
    2731             : 
    2732          20 :     ExecDropSingleTupleTableSlot(slot);
    2733             : 
    2734          20 :     walrcv_clear_result(res);
    2735             : }
    2736             : 
    2737             : /*
    2738             :  * Determine whether the retain_dead_tuples can be enabled based on the
    2739             :  * publisher's status.
    2740             :  *
    2741             :  * This option is disallowed if the publisher is running a version earlier
    2742             :  * than the PG19, or if the publisher is in recovery (i.e., it is a standby
    2743             :  * server).
    2744             :  *
    2745             :  * See comments atop worker.c for a detailed explanation.
    2746             :  */
    2747             : static void
    2748          24 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
    2749             : {
    2750             :     WalRcvExecResult *res;
    2751          24 :     Oid         RecoveryRow[1] = {BOOLOID};
    2752             :     TupleTableSlot *slot;
    2753             :     bool        isnull;
    2754             :     bool        remote_in_recovery;
    2755             : 
    2756          24 :     if (walrcv_server_version(wrconn) < 19000)
    2757           0 :         ereport(ERROR,
    2758             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2759             :                 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
    2760             : 
    2761          24 :     res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
    2762             : 
    2763          24 :     if (res->status != WALRCV_OK_TUPLES)
    2764           0 :         ereport(ERROR,
    2765             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2766             :                  errmsg("could not obtain recovery progress from the publisher: %s",
    2767             :                         res->err)));
    2768             : 
    2769          24 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2770          24 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2771           0 :         elog(ERROR, "failed to fetch tuple for the recovery progress");
    2772             : 
    2773          24 :     remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
    2774             : 
    2775          24 :     if (remote_in_recovery)
    2776           0 :         ereport(ERROR,
    2777             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2778             :                 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
    2779             : 
    2780          24 :     ExecDropSingleTupleTableSlot(slot);
    2781             : 
    2782          24 :     walrcv_clear_result(res);
    2783          24 : }
    2784             : 
    2785             : /*
    2786             :  * Check if the subscriber's configuration is adequate to enable the
    2787             :  * retain_dead_tuples option.
    2788             :  *
    2789             :  * Issue an ERROR if the wal_level does not support the use of replication
    2790             :  * slots when check_guc is set to true.
    2791             :  *
    2792             :  * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
    2793             :  * set to true. This is only to highlight the importance of enabling
    2794             :  * track_commit_timestamp instead of catching all the misconfigurations, as
    2795             :  * this setting can be adjusted after subscription creation. Without it, the
    2796             :  * apply worker will simply skip conflict detection.
    2797             :  *
    2798             :  * Issue a WARNING or NOTICE if the subscription is disabled and the retention
    2799             :  * is active. Do not raise an ERROR since users can only modify
    2800             :  * retain_dead_tuples for disabled subscriptions. And as long as the
    2801             :  * subscription is enabled promptly, it will not pose issues.
    2802             :  *
    2803             :  * Issue a NOTICE to inform users that max_retention_duration is
    2804             :  * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
    2805             :  * is not issued because setting max_retention_duration causes no harm,
    2806             :  * even when it is ineffective.
    2807             :  */
    2808             : void
    2809         480 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
    2810             :                            int elevel_for_sub_disabled,
    2811             :                            bool retain_dead_tuples, bool retention_active,
    2812             :                            bool max_retention_set)
    2813             : {
    2814             :     Assert(elevel_for_sub_disabled == NOTICE ||
    2815             :            elevel_for_sub_disabled == WARNING);
    2816             : 
    2817         480 :     if (retain_dead_tuples)
    2818             :     {
    2819          34 :         if (check_guc && wal_level < WAL_LEVEL_REPLICA)
    2820           0 :             ereport(ERROR,
    2821             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2822             :                     errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
    2823             :                     errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
    2824             : 
    2825          34 :         if (check_guc && !track_commit_timestamp)
    2826           8 :             ereport(WARNING,
    2827             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2828             :                     errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
    2829             :                     errhint("Consider setting \"%s\" to true.",
    2830             :                             "track_commit_timestamp"));
    2831             : 
    2832          34 :         if (sub_disabled && retention_active)
    2833          14 :             ereport(elevel_for_sub_disabled,
    2834             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2835             :                     errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
    2836             :                     (elevel_for_sub_disabled > NOTICE)
    2837             :                     ? errhint("Consider setting %s to false.",
    2838             :                               "retain_dead_tuples") : 0);
    2839             :     }
    2840         446 :     else if (max_retention_set)
    2841             :     {
    2842           6 :         ereport(NOTICE,
    2843             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2844             :                 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
    2845             :     }
    2846         480 : }
    2847             : 
    2848             : /*
    2849             :  * Return true iff 'rv' is a member of the list.
    2850             :  */
    2851             : static bool
    2852         544 : list_member_rangevar(const List *list, RangeVar *rv)
    2853             : {
    2854        2000 :     foreach_ptr(PublicationRelKind, relinfo, list)
    2855             :     {
    2856         916 :         if (equal(relinfo->rv, rv))
    2857           2 :             return true;
    2858             :     }
    2859             : 
    2860         542 :     return false;
    2861             : }
    2862             : 
    2863             : /*
    2864             :  * Get the list of tables and sequences which belong to specified publications
    2865             :  * on the publisher connection.
    2866             :  *
    2867             :  * Note that we don't support the case where the column list is different for
    2868             :  * the same table in different publications to avoid sending unwanted column
    2869             :  * information for some of the rows. This can happen when both the column
    2870             :  * list and row filter are specified for different publications.
    2871             :  */
    2872             : static List *
    2873         312 : fetch_relation_list(WalReceiverConn *wrconn, List *publications)
    2874             : {
    2875             :     WalRcvExecResult *res;
    2876             :     StringInfoData cmd;
    2877             :     TupleTableSlot *slot;
    2878         312 :     Oid         tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
    2879         312 :     List       *relationlist = NIL;
    2880         312 :     int         server_version = walrcv_server_version(wrconn);
    2881         312 :     bool        check_columnlist = (server_version >= 150000);
    2882         312 :     int         column_count = check_columnlist ? 4 : 3;
    2883             :     StringInfoData pub_names;
    2884             : 
    2885         312 :     initStringInfo(&cmd);
    2886         312 :     initStringInfo(&pub_names);
    2887             : 
    2888             :     /* Build the pub_names comma-separated string. */
    2889         312 :     GetPublicationsStr(publications, &pub_names, true);
    2890             : 
    2891             :     /* Get the list of relations from the publisher */
    2892         312 :     if (server_version >= 160000)
    2893             :     {
    2894         312 :         tableRow[3] = INT2VECTOROID;
    2895             : 
    2896             :         /*
    2897             :          * From version 16, we allowed passing multiple publications to the
    2898             :          * function pg_get_publication_tables. This helped to filter out the
    2899             :          * partition table whose ancestor is also published in this
    2900             :          * publication array.
    2901             :          *
    2902             :          * Join pg_get_publication_tables with pg_publication to exclude
    2903             :          * non-existing publications.
    2904             :          *
    2905             :          * Note that attrs are always stored in sorted order so we don't need
    2906             :          * to worry if different publications have specified them in a
    2907             :          * different order. See pub_collist_validate.
    2908             :          */
    2909         312 :         appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
    2910             :                          "   FROM pg_class c\n"
    2911             :                          "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
    2912             :                          "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
    2913             :                          "                FROM pg_publication\n"
    2914             :                          "                WHERE pubname IN ( %s )) AS gpt\n"
    2915             :                          "             ON gpt.relid = c.oid\n",
    2916             :                          pub_names.data);
    2917             : 
    2918             :         /* From version 19, inclusion of sequences in the target is supported */
    2919         312 :         if (server_version >= 190000)
    2920         312 :             appendStringInfo(&cmd,
    2921             :                              "UNION ALL\n"
    2922             :                              "  SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
    2923             :                              "  FROM pg_catalog.pg_publication_sequences s\n"
    2924             :                              "  WHERE s.pubname IN ( %s )",
    2925             :                              pub_names.data);
    2926             :     }
    2927             :     else
    2928             :     {
    2929           0 :         tableRow[3] = NAMEARRAYOID;
    2930           0 :         appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
    2931             : 
    2932             :         /* Get column lists for each relation if the publisher supports it */
    2933           0 :         if (check_columnlist)
    2934           0 :             appendStringInfoString(&cmd, ", t.attnames\n");
    2935             : 
    2936           0 :         appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
    2937             :                          " WHERE t.pubname IN ( %s )",
    2938             :                          pub_names.data);
    2939             :     }
    2940             : 
    2941         312 :     pfree(pub_names.data);
    2942             : 
    2943         312 :     res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
    2944         312 :     pfree(cmd.data);
    2945             : 
    2946         312 :     if (res->status != WALRCV_OK_TUPLES)
    2947           0 :         ereport(ERROR,
    2948             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2949             :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    2950             :                         res->err)));
    2951             : 
    2952             :     /* Process tables. */
    2953         312 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2954         888 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2955             :     {
    2956             :         char       *nspname;
    2957             :         char       *relname;
    2958             :         bool        isnull;
    2959             :         char        relkind;
    2960         578 :         PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
    2961             : 
    2962         578 :         nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2963             :         Assert(!isnull);
    2964         578 :         relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
    2965             :         Assert(!isnull);
    2966         578 :         relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
    2967             :         Assert(!isnull);
    2968             : 
    2969         578 :         relinfo->rv = makeRangeVar(nspname, relname, -1);
    2970         578 :         relinfo->relkind = relkind;
    2971             : 
    2972         578 :         if (relkind != RELKIND_SEQUENCE &&
    2973         544 :             check_columnlist &&
    2974         544 :             list_member_rangevar(relationlist, relinfo->rv))
    2975           2 :             ereport(ERROR,
    2976             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2977             :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    2978             :                            nspname, relname));
    2979             :         else
    2980         576 :             relationlist = lappend(relationlist, relinfo);
    2981             : 
    2982         576 :         ExecClearTuple(slot);
    2983             :     }
    2984         310 :     ExecDropSingleTupleTableSlot(slot);
    2985             : 
    2986         310 :     walrcv_clear_result(res);
    2987             : 
    2988         310 :     return relationlist;
    2989             : }
    2990             : 
    2991             : /*
    2992             :  * This is to report the connection failure while dropping replication slots.
    2993             :  * Here, we report the WARNING for all tablesync slots so that user can drop
    2994             :  * them manually, if required.
    2995             :  */
    2996             : static void
    2997           0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
    2998             : {
    2999             :     ListCell   *lc;
    3000             : 
    3001           0 :     foreach(lc, rstates)
    3002             :     {
    3003           0 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    3004           0 :         Oid         relid = rstate->relid;
    3005             : 
    3006             :         /* Only cleanup resources of tablesync workers */
    3007           0 :         if (!OidIsValid(relid))
    3008           0 :             continue;
    3009             : 
    3010             :         /*
    3011             :          * Caller needs to ensure that relstate doesn't change underneath us.
    3012             :          * See DropSubscription where we get the relstates.
    3013             :          */
    3014           0 :         if (rstate->state != SUBREL_STATE_SYNCDONE)
    3015             :         {
    3016           0 :             char        syncslotname[NAMEDATALEN] = {0};
    3017             : 
    3018           0 :             ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    3019             :                                             sizeof(syncslotname));
    3020           0 :             elog(WARNING, "could not drop tablesync replication slot \"%s\"",
    3021             :                  syncslotname);
    3022             :         }
    3023             :     }
    3024             : 
    3025           0 :     ereport(ERROR,
    3026             :             (errcode(ERRCODE_CONNECTION_FAILURE),
    3027             :              errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
    3028             :                     slotname, err),
    3029             :     /* translator: %s is an SQL ALTER command */
    3030             :              errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
    3031             :                      "ALTER SUBSCRIPTION ... DISABLE",
    3032             :                      "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
    3033             : }
    3034             : 
    3035             : /*
    3036             :  * Check for duplicates in the given list of publications and error out if
    3037             :  * found one.  Add publications to datums as text datums, if datums is not
    3038             :  * NULL.
    3039             :  */
    3040             : static void
    3041         448 : check_duplicates_in_publist(List *publist, Datum *datums)
    3042             : {
    3043             :     ListCell   *cell;
    3044         448 :     int         j = 0;
    3045             : 
    3046        1024 :     foreach(cell, publist)
    3047             :     {
    3048         594 :         char       *name = strVal(lfirst(cell));
    3049             :         ListCell   *pcell;
    3050             : 
    3051         896 :         foreach(pcell, publist)
    3052             :         {
    3053         896 :             char       *pname = strVal(lfirst(pcell));
    3054             : 
    3055         896 :             if (pcell == cell)
    3056         576 :                 break;
    3057             : 
    3058         320 :             if (strcmp(name, pname) == 0)
    3059          18 :                 ereport(ERROR,
    3060             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    3061             :                          errmsg("publication name \"%s\" used more than once",
    3062             :                                 pname)));
    3063             :         }
    3064             : 
    3065         576 :         if (datums)
    3066         490 :             datums[j++] = CStringGetTextDatum(name);
    3067             :     }
    3068         430 : }
    3069             : 
    3070             : /*
    3071             :  * Merge current subscription's publications and user-specified publications
    3072             :  * from ADD/DROP PUBLICATIONS.
    3073             :  *
    3074             :  * If addpub is true, we will add the list of publications into oldpublist.
    3075             :  * Otherwise, we will delete the list of publications from oldpublist.  The
    3076             :  * returned list is a copy, oldpublist itself is not changed.
    3077             :  *
    3078             :  * subname is the subscription name, for error messages.
    3079             :  */
    3080             : static List *
    3081          54 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
    3082             : {
    3083             :     ListCell   *lc;
    3084             : 
    3085          54 :     oldpublist = list_copy(oldpublist);
    3086             : 
    3087          54 :     check_duplicates_in_publist(newpublist, NULL);
    3088             : 
    3089          92 :     foreach(lc, newpublist)
    3090             :     {
    3091          68 :         char       *name = strVal(lfirst(lc));
    3092             :         ListCell   *lc2;
    3093          68 :         bool        found = false;
    3094             : 
    3095         134 :         foreach(lc2, oldpublist)
    3096             :         {
    3097         110 :             char       *pubname = strVal(lfirst(lc2));
    3098             : 
    3099         110 :             if (strcmp(name, pubname) == 0)
    3100             :             {
    3101          44 :                 found = true;
    3102          44 :                 if (addpub)
    3103          12 :                     ereport(ERROR,
    3104             :                             (errcode(ERRCODE_DUPLICATE_OBJECT),
    3105             :                              errmsg("publication \"%s\" is already in subscription \"%s\"",
    3106             :                                     name, subname)));
    3107             :                 else
    3108          32 :                     oldpublist = foreach_delete_current(oldpublist, lc2);
    3109             : 
    3110          32 :                 break;
    3111             :             }
    3112             :         }
    3113             : 
    3114          56 :         if (addpub && !found)
    3115          18 :             oldpublist = lappend(oldpublist, makeString(name));
    3116          38 :         else if (!addpub && !found)
    3117           6 :             ereport(ERROR,
    3118             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3119             :                      errmsg("publication \"%s\" is not in subscription \"%s\"",
    3120             :                             name, subname)));
    3121             :     }
    3122             : 
    3123             :     /*
    3124             :      * XXX Probably no strong reason for this, but for now it's to make ALTER
    3125             :      * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
    3126             :      */
    3127          24 :     if (!oldpublist)
    3128           6 :         ereport(ERROR,
    3129             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3130             :                  errmsg("cannot drop all the publications from a subscription")));
    3131             : 
    3132          18 :     return oldpublist;
    3133             : }
    3134             : 
    3135             : /*
    3136             :  * Extract the streaming mode value from a DefElem.  This is like
    3137             :  * defGetBoolean() but also accepts the special value of "parallel".
    3138             :  */
    3139             : char
    3140         854 : defGetStreamingMode(DefElem *def)
    3141             : {
    3142             :     /*
    3143             :      * If no parameter value given, assume "true" is meant.
    3144             :      */
    3145         854 :     if (!def->arg)
    3146           0 :         return LOGICALREP_STREAM_ON;
    3147             : 
    3148             :     /*
    3149             :      * Allow 0, 1, "false", "true", "off", "on" or "parallel".
    3150             :      */
    3151         854 :     switch (nodeTag(def->arg))
    3152             :     {
    3153           0 :         case T_Integer:
    3154           0 :             switch (intVal(def->arg))
    3155             :             {
    3156           0 :                 case 0:
    3157           0 :                     return LOGICALREP_STREAM_OFF;
    3158           0 :                 case 1:
    3159           0 :                     return LOGICALREP_STREAM_ON;
    3160           0 :                 default:
    3161             :                     /* otherwise, error out below */
    3162           0 :                     break;
    3163             :             }
    3164           0 :             break;
    3165         854 :         default:
    3166             :             {
    3167         854 :                 char       *sval = defGetString(def);
    3168             : 
    3169             :                 /*
    3170             :                  * The set of strings accepted here should match up with the
    3171             :                  * grammar's opt_boolean_or_string production.
    3172             :                  */
    3173        1702 :                 if (pg_strcasecmp(sval, "false") == 0 ||
    3174         848 :                     pg_strcasecmp(sval, "off") == 0)
    3175          12 :                     return LOGICALREP_STREAM_OFF;
    3176        1666 :                 if (pg_strcasecmp(sval, "true") == 0 ||
    3177         824 :                     pg_strcasecmp(sval, "on") == 0)
    3178          88 :                     return LOGICALREP_STREAM_ON;
    3179         754 :                 if (pg_strcasecmp(sval, "parallel") == 0)
    3180         748 :                     return LOGICALREP_STREAM_PARALLEL;
    3181             :             }
    3182           6 :             break;
    3183             :     }
    3184             : 
    3185           6 :     ereport(ERROR,
    3186             :             (errcode(ERRCODE_SYNTAX_ERROR),
    3187             :              errmsg("%s requires a Boolean value or \"parallel\"",
    3188             :                     def->defname)));
    3189             :     return LOGICALREP_STREAM_OFF;   /* keep compiler quiet */
    3190             : }

Generated by: LCOV version 1.16