LCOV - code coverage report
Current view: top level - src/backend/commands - subscriptioncmds.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.7 % 1090 945
Test Date: 2026-03-10 15:14:48 Functions: 95.7 % 23 22
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * subscriptioncmds.c
       4              :  *      subscription catalog manipulation functions
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/commands/subscriptioncmds.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/commit_ts.h"
      18              : #include "access/htup_details.h"
      19              : #include "access/table.h"
      20              : #include "access/twophase.h"
      21              : #include "access/xact.h"
      22              : #include "catalog/catalog.h"
      23              : #include "catalog/dependency.h"
      24              : #include "catalog/indexing.h"
      25              : #include "catalog/namespace.h"
      26              : #include "catalog/objectaccess.h"
      27              : #include "catalog/objectaddress.h"
      28              : #include "catalog/pg_authid_d.h"
      29              : #include "catalog/pg_database_d.h"
      30              : #include "catalog/pg_foreign_server.h"
      31              : #include "catalog/pg_subscription.h"
      32              : #include "catalog/pg_subscription_rel.h"
      33              : #include "catalog/pg_type.h"
      34              : #include "catalog/pg_user_mapping.h"
      35              : #include "commands/defrem.h"
      36              : #include "commands/event_trigger.h"
      37              : #include "commands/subscriptioncmds.h"
      38              : #include "executor/executor.h"
      39              : #include "foreign/foreign.h"
      40              : #include "miscadmin.h"
      41              : #include "nodes/makefuncs.h"
      42              : #include "pgstat.h"
      43              : #include "replication/logicallauncher.h"
      44              : #include "replication/logicalworker.h"
      45              : #include "replication/origin.h"
      46              : #include "replication/slot.h"
      47              : #include "replication/walreceiver.h"
      48              : #include "replication/walsender.h"
      49              : #include "replication/worker_internal.h"
      50              : #include "storage/lmgr.h"
      51              : #include "utils/acl.h"
      52              : #include "utils/builtins.h"
      53              : #include "utils/guc.h"
      54              : #include "utils/lsyscache.h"
      55              : #include "utils/memutils.h"
      56              : #include "utils/pg_lsn.h"
      57              : #include "utils/syscache.h"
      58              : 
      59              : /*
      60              :  * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
      61              :  * command.
      62              :  */
      63              : #define SUBOPT_CONNECT              0x00000001
      64              : #define SUBOPT_ENABLED              0x00000002
      65              : #define SUBOPT_CREATE_SLOT          0x00000004
      66              : #define SUBOPT_SLOT_NAME            0x00000008
      67              : #define SUBOPT_COPY_DATA            0x00000010
      68              : #define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020
      69              : #define SUBOPT_REFRESH              0x00000040
      70              : #define SUBOPT_BINARY               0x00000080
      71              : #define SUBOPT_STREAMING            0x00000100
      72              : #define SUBOPT_TWOPHASE_COMMIT      0x00000200
      73              : #define SUBOPT_DISABLE_ON_ERR       0x00000400
      74              : #define SUBOPT_PASSWORD_REQUIRED    0x00000800
      75              : #define SUBOPT_RUN_AS_OWNER         0x00001000
      76              : #define SUBOPT_FAILOVER             0x00002000
      77              : #define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000
      78              : #define SUBOPT_MAX_RETENTION_DURATION   0x00008000
      79              : #define SUBOPT_WAL_RECEIVER_TIMEOUT         0x00010000
      80              : #define SUBOPT_LSN                  0x00020000
      81              : #define SUBOPT_ORIGIN               0x00040000
      82              : 
      83              : /* check if the 'val' has 'bits' set */
      84              : #define IsSet(val, bits)  (((val) & (bits)) == (bits))
      85              : 
      86              : /*
      87              :  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
      88              :  * SUBSCRIPTION command options and the parsed/default values of each of them.
      89              :  */
      90              : typedef struct SubOpts
      91              : {
      92              :     bits32      specified_opts;
      93              :     char       *slot_name;
      94              :     char       *synchronous_commit;
      95              :     bool        connect;
      96              :     bool        enabled;
      97              :     bool        create_slot;
      98              :     bool        copy_data;
      99              :     bool        refresh;
     100              :     bool        binary;
     101              :     char        streaming;
     102              :     bool        twophase;
     103              :     bool        disableonerr;
     104              :     bool        passwordrequired;
     105              :     bool        runasowner;
     106              :     bool        failover;
     107              :     bool        retaindeadtuples;
     108              :     int32       maxretention;
     109              :     char       *origin;
     110              :     XLogRecPtr  lsn;
     111              :     char       *wal_receiver_timeout;
     112              : } SubOpts;
     113              : 
     114              : /*
     115              :  * PublicationRelKind represents a relation included in a publication.
     116              :  * It stores the schema-qualified relation name (rv) and its kind (relkind).
     117              :  */
     118              : typedef struct PublicationRelKind
     119              : {
     120              :     RangeVar   *rv;
     121              :     char        relkind;
     122              : } PublicationRelKind;
     123              : 
     124              : static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
     125              : static void check_publications_origin_tables(WalReceiverConn *wrconn,
     126              :                                              List *publications, bool copydata,
     127              :                                              bool retain_dead_tuples,
     128              :                                              char *origin,
     129              :                                              Oid *subrel_local_oids,
     130              :                                              int subrel_count, char *subname);
     131              : static void check_publications_origin_sequences(WalReceiverConn *wrconn,
     132              :                                                 List *publications,
     133              :                                                 bool copydata, char *origin,
     134              :                                                 Oid *subrel_local_oids,
     135              :                                                 int subrel_count,
     136              :                                                 char *subname);
     137              : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
     138              : static void check_duplicates_in_publist(List *publist, Datum *datums);
     139              : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
     140              : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
     141              : static void CheckAlterSubOption(Subscription *sub, const char *option,
     142              :                                 bool slot_needs_update, bool isTopLevel);
     143              : 
     144              : 
     145              : /*
     146              :  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
     147              :  *
     148              :  * Since not all options can be specified in both commands, this function
     149              :  * will report an error if mutually exclusive options are specified.
     150              :  */
     151              : static void
     152          521 : parse_subscription_options(ParseState *pstate, List *stmt_options,
     153              :                            bits32 supported_opts, SubOpts *opts)
     154              : {
     155              :     ListCell   *lc;
     156              : 
     157              :     /* Start out with cleared opts. */
     158          521 :     memset(opts, 0, sizeof(SubOpts));
     159              : 
     160              :     /* caller must expect some option */
     161              :     Assert(supported_opts != 0);
     162              : 
     163              :     /* If connect option is supported, these others also need to be. */
     164              :     Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
     165              :            IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     166              :                  SUBOPT_COPY_DATA));
     167              : 
     168              :     /* Set default values for the supported options. */
     169          521 :     if (IsSet(supported_opts, SUBOPT_CONNECT))
     170          261 :         opts->connect = true;
     171          521 :     if (IsSet(supported_opts, SUBOPT_ENABLED))
     172          314 :         opts->enabled = true;
     173          521 :     if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
     174          261 :         opts->create_slot = true;
     175          521 :     if (IsSet(supported_opts, SUBOPT_COPY_DATA))
     176          335 :         opts->copy_data = true;
     177          521 :     if (IsSet(supported_opts, SUBOPT_REFRESH))
     178           43 :         opts->refresh = true;
     179          521 :     if (IsSet(supported_opts, SUBOPT_BINARY))
     180          382 :         opts->binary = false;
     181          521 :     if (IsSet(supported_opts, SUBOPT_STREAMING))
     182          382 :         opts->streaming = LOGICALREP_STREAM_PARALLEL;
     183          521 :     if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
     184          382 :         opts->twophase = false;
     185          521 :     if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
     186          382 :         opts->disableonerr = false;
     187          521 :     if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
     188          382 :         opts->passwordrequired = true;
     189          521 :     if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
     190          382 :         opts->runasowner = false;
     191          521 :     if (IsSet(supported_opts, SUBOPT_FAILOVER))
     192          382 :         opts->failover = false;
     193          521 :     if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     194          382 :         opts->retaindeadtuples = false;
     195          521 :     if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
     196          382 :         opts->maxretention = 0;
     197          521 :     if (IsSet(supported_opts, SUBOPT_ORIGIN))
     198          382 :         opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
     199              : 
     200              :     /* Parse options */
     201         1037 :     foreach(lc, stmt_options)
     202              :     {
     203          552 :         DefElem    *defel = (DefElem *) lfirst(lc);
     204              : 
     205          552 :         if (IsSet(supported_opts, SUBOPT_CONNECT) &&
     206          326 :             strcmp(defel->defname, "connect") == 0)
     207              :         {
     208          108 :             if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
     209            0 :                 errorConflictingDefElem(defel, pstate);
     210              : 
     211          108 :             opts->specified_opts |= SUBOPT_CONNECT;
     212          108 :             opts->connect = defGetBoolean(defel);
     213              :         }
     214          444 :         else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
     215          271 :                  strcmp(defel->defname, "enabled") == 0)
     216              :         {
     217           73 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     218            0 :                 errorConflictingDefElem(defel, pstate);
     219              : 
     220           73 :             opts->specified_opts |= SUBOPT_ENABLED;
     221           73 :             opts->enabled = defGetBoolean(defel);
     222              :         }
     223          371 :         else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
     224          198 :                  strcmp(defel->defname, "create_slot") == 0)
     225              :         {
     226           20 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     227            0 :                 errorConflictingDefElem(defel, pstate);
     228              : 
     229           20 :             opts->specified_opts |= SUBOPT_CREATE_SLOT;
     230           20 :             opts->create_slot = defGetBoolean(defel);
     231              :         }
     232          351 :         else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
     233          301 :                  strcmp(defel->defname, "slot_name") == 0)
     234              :         {
     235           95 :             if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     236            0 :                 errorConflictingDefElem(defel, pstate);
     237              : 
     238           95 :             opts->specified_opts |= SUBOPT_SLOT_NAME;
     239           95 :             opts->slot_name = defGetString(defel);
     240              : 
     241              :             /* Setting slot_name = NONE is treated as no slot name. */
     242          187 :             if (strcmp(opts->slot_name, "none") == 0)
     243           74 :                 opts->slot_name = NULL;
     244              :             else
     245           21 :                 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
     246              :         }
     247          256 :         else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
     248          163 :                  strcmp(defel->defname, "copy_data") == 0)
     249              :         {
     250           29 :             if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     251            0 :                 errorConflictingDefElem(defel, pstate);
     252              : 
     253           29 :             opts->specified_opts |= SUBOPT_COPY_DATA;
     254           29 :             opts->copy_data = defGetBoolean(defel);
     255              :         }
     256          227 :         else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
     257          182 :                  strcmp(defel->defname, "synchronous_commit") == 0)
     258              :         {
     259            6 :             if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
     260            0 :                 errorConflictingDefElem(defel, pstate);
     261              : 
     262            6 :             opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
     263            6 :             opts->synchronous_commit = defGetString(defel);
     264              : 
     265              :             /* Test if the given value is valid for synchronous_commit GUC. */
     266            6 :             (void) set_config_option("synchronous_commit", opts->synchronous_commit,
     267              :                                      PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     268              :                                      false, 0, false);
     269              :         }
     270          221 :         else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
     271           33 :                  strcmp(defel->defname, "refresh") == 0)
     272              :         {
     273           33 :             if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
     274            0 :                 errorConflictingDefElem(defel, pstate);
     275              : 
     276           33 :             opts->specified_opts |= SUBOPT_REFRESH;
     277           33 :             opts->refresh = defGetBoolean(defel);
     278              :         }
     279          188 :         else if (IsSet(supported_opts, SUBOPT_BINARY) &&
     280          176 :                  strcmp(defel->defname, "binary") == 0)
     281              :         {
     282           16 :             if (IsSet(opts->specified_opts, SUBOPT_BINARY))
     283            0 :                 errorConflictingDefElem(defel, pstate);
     284              : 
     285           16 :             opts->specified_opts |= SUBOPT_BINARY;
     286           16 :             opts->binary = defGetBoolean(defel);
     287              :         }
     288          172 :         else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
     289          160 :                  strcmp(defel->defname, "streaming") == 0)
     290              :         {
     291           37 :             if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
     292            0 :                 errorConflictingDefElem(defel, pstate);
     293              : 
     294           37 :             opts->specified_opts |= SUBOPT_STREAMING;
     295           37 :             opts->streaming = defGetStreamingMode(defel);
     296              :         }
     297          135 :         else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
     298          123 :                  strcmp(defel->defname, "two_phase") == 0)
     299              :         {
     300           21 :             if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
     301            0 :                 errorConflictingDefElem(defel, pstate);
     302              : 
     303           21 :             opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
     304           21 :             opts->twophase = defGetBoolean(defel);
     305              :         }
     306          114 :         else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
     307          102 :                  strcmp(defel->defname, "disable_on_error") == 0)
     308              :         {
     309           10 :             if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
     310            0 :                 errorConflictingDefElem(defel, pstate);
     311              : 
     312           10 :             opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
     313           10 :             opts->disableonerr = defGetBoolean(defel);
     314              :         }
     315          104 :         else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
     316           92 :                  strcmp(defel->defname, "password_required") == 0)
     317              :         {
     318           13 :             if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
     319            0 :                 errorConflictingDefElem(defel, pstate);
     320              : 
     321           13 :             opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
     322           13 :             opts->passwordrequired = defGetBoolean(defel);
     323              :         }
     324           91 :         else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
     325           79 :                  strcmp(defel->defname, "run_as_owner") == 0)
     326              :         {
     327            9 :             if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
     328            0 :                 errorConflictingDefElem(defel, pstate);
     329              : 
     330            9 :             opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
     331            9 :             opts->runasowner = defGetBoolean(defel);
     332              :         }
     333           82 :         else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
     334           70 :                  strcmp(defel->defname, "failover") == 0)
     335              :         {
     336           14 :             if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
     337            0 :                 errorConflictingDefElem(defel, pstate);
     338              : 
     339           14 :             opts->specified_opts |= SUBOPT_FAILOVER;
     340           14 :             opts->failover = defGetBoolean(defel);
     341              :         }
     342           68 :         else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
     343           56 :                  strcmp(defel->defname, "retain_dead_tuples") == 0)
     344              :         {
     345           12 :             if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     346            0 :                 errorConflictingDefElem(defel, pstate);
     347              : 
     348           12 :             opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
     349           12 :             opts->retaindeadtuples = defGetBoolean(defel);
     350              :         }
     351           56 :         else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
     352           44 :                  strcmp(defel->defname, "max_retention_duration") == 0)
     353              :         {
     354           11 :             if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
     355            0 :                 errorConflictingDefElem(defel, pstate);
     356              : 
     357           11 :             opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
     358           11 :             opts->maxretention = defGetInt32(defel);
     359              :         }
     360           45 :         else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
     361           33 :                  strcmp(defel->defname, "origin") == 0)
     362              :         {
     363           21 :             if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
     364            0 :                 errorConflictingDefElem(defel, pstate);
     365              : 
     366           21 :             opts->specified_opts |= SUBOPT_ORIGIN;
     367           21 :             pfree(opts->origin);
     368              : 
     369              :             /*
     370              :              * Even though the "origin" parameter allows only "none" and "any"
     371              :              * values, it is implemented as a string type so that the
     372              :              * parameter can be extended in future versions to support
     373              :              * filtering using origin names specified by the user.
     374              :              */
     375           21 :             opts->origin = defGetString(defel);
     376              : 
     377           29 :             if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
     378            8 :                 (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
     379            3 :                 ereport(ERROR,
     380              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     381              :                         errmsg("unrecognized origin value: \"%s\"", opts->origin));
     382              :         }
     383           24 :         else if (IsSet(supported_opts, SUBOPT_LSN) &&
     384           12 :                  strcmp(defel->defname, "lsn") == 0)
     385            9 :         {
     386           12 :             char       *lsn_str = defGetString(defel);
     387              :             XLogRecPtr  lsn;
     388              : 
     389           12 :             if (IsSet(opts->specified_opts, SUBOPT_LSN))
     390            0 :                 errorConflictingDefElem(defel, pstate);
     391              : 
     392              :             /* Setting lsn = NONE is treated as resetting LSN */
     393           12 :             if (strcmp(lsn_str, "none") == 0)
     394            3 :                 lsn = InvalidXLogRecPtr;
     395              :             else
     396              :             {
     397              :                 /* Parse the argument as LSN */
     398            9 :                 lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
     399              :                                                       CStringGetDatum(lsn_str)));
     400              : 
     401            9 :                 if (!XLogRecPtrIsValid(lsn))
     402            3 :                     ereport(ERROR,
     403              :                             (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     404              :                              errmsg("invalid WAL location (LSN): %s", lsn_str)));
     405              :             }
     406              : 
     407            9 :             opts->specified_opts |= SUBOPT_LSN;
     408            9 :             opts->lsn = lsn;
     409              :         }
     410           12 :         else if (IsSet(supported_opts, SUBOPT_WAL_RECEIVER_TIMEOUT) &&
     411           12 :                  strcmp(defel->defname, "wal_receiver_timeout") == 0)
     412            6 :         {
     413              :             bool        parsed;
     414              :             int         val;
     415              : 
     416            9 :             if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
     417            0 :                 errorConflictingDefElem(defel, pstate);
     418              : 
     419            9 :             opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
     420            9 :             opts->wal_receiver_timeout = defGetString(defel);
     421              : 
     422              :             /*
     423              :              * Test if the given value is valid for wal_receiver_timeout GUC.
     424              :              * Skip this test if the value is -1, since -1 is allowed for the
     425              :              * wal_receiver_timeout subscription option, but not for the GUC
     426              :              * itself.
     427              :              */
     428            9 :             parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
     429            9 :             if (!parsed || val != -1)
     430            6 :                 (void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
     431              :                                          PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     432              :                                          false, 0, false);
     433              :         }
     434              :         else
     435            3 :             ereport(ERROR,
     436              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     437              :                      errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
     438              :     }
     439              : 
     440              :     /*
     441              :      * We've been explicitly asked to not connect, that requires some
     442              :      * additional processing.
     443              :      */
     444          485 :     if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
     445              :     {
     446              :         /* Check for incompatible options from the user. */
     447           87 :         if (opts->enabled &&
     448           87 :             IsSet(opts->specified_opts, SUBOPT_ENABLED))
     449            3 :             ereport(ERROR,
     450              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     451              :             /*- translator: both %s are strings of the form "option = value" */
     452              :                      errmsg("%s and %s are mutually exclusive options",
     453              :                             "connect = false", "enabled = true")));
     454              : 
     455           84 :         if (opts->create_slot &&
     456           81 :             IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     457            3 :             ereport(ERROR,
     458              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     459              :                      errmsg("%s and %s are mutually exclusive options",
     460              :                             "connect = false", "create_slot = true")));
     461              : 
     462           81 :         if (opts->copy_data &&
     463           78 :             IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     464            3 :             ereport(ERROR,
     465              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     466              :                      errmsg("%s and %s are mutually exclusive options",
     467              :                             "connect = false", "copy_data = true")));
     468              : 
     469              :         /* Change the defaults of other options. */
     470           78 :         opts->enabled = false;
     471           78 :         opts->create_slot = false;
     472           78 :         opts->copy_data = false;
     473              :     }
     474              : 
     475              :     /*
     476              :      * Do additional checking for disallowed combination when slot_name = NONE
     477              :      * was used.
     478              :      */
     479          476 :     if (!opts->slot_name &&
     480          458 :         IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     481              :     {
     482           71 :         if (opts->enabled)
     483              :         {
     484            9 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     485            3 :                 ereport(ERROR,
     486              :                         (errcode(ERRCODE_SYNTAX_ERROR),
     487              :                 /*- translator: both %s are strings of the form "option = value" */
     488              :                          errmsg("%s and %s are mutually exclusive options",
     489              :                                 "slot_name = NONE", "enabled = true")));
     490              :             else
     491            6 :                 ereport(ERROR,
     492              :                         (errcode(ERRCODE_SYNTAX_ERROR),
     493              :                 /*- translator: both %s are strings of the form "option = value" */
     494              :                          errmsg("subscription with %s must also set %s",
     495              :                                 "slot_name = NONE", "enabled = false")));
     496              :         }
     497              : 
     498           62 :         if (opts->create_slot)
     499              :         {
     500            6 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     501            3 :                 ereport(ERROR,
     502              :                         (errcode(ERRCODE_SYNTAX_ERROR),
     503              :                 /*- translator: both %s are strings of the form "option = value" */
     504              :                          errmsg("%s and %s are mutually exclusive options",
     505              :                                 "slot_name = NONE", "create_slot = true")));
     506              :             else
     507            3 :                 ereport(ERROR,
     508              :                         (errcode(ERRCODE_SYNTAX_ERROR),
     509              :                 /*- translator: both %s are strings of the form "option = value" */
     510              :                          errmsg("subscription with %s must also set %s",
     511              :                                 "slot_name = NONE", "create_slot = false")));
     512              :         }
     513              :     }
     514          461 : }
     515              : 
     516              : /*
     517              :  * Check that the specified publications are present on the publisher.
     518              :  */
     519              : static void
     520          134 : check_publications(WalReceiverConn *wrconn, List *publications)
     521              : {
     522              :     WalRcvExecResult *res;
     523              :     StringInfoData cmd;
     524              :     TupleTableSlot *slot;
     525          134 :     List       *publicationsCopy = NIL;
     526          134 :     Oid         tableRow[1] = {TEXTOID};
     527              : 
     528          134 :     initStringInfo(&cmd);
     529          134 :     appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
     530              :                            " pg_catalog.pg_publication t WHERE\n"
     531              :                            " t.pubname IN (");
     532          134 :     GetPublicationsStr(publications, &cmd, true);
     533          134 :     appendStringInfoChar(&cmd, ')');
     534              : 
     535          134 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
     536          134 :     pfree(cmd.data);
     537              : 
     538          134 :     if (res->status != WALRCV_OK_TUPLES)
     539            0 :         ereport(ERROR,
     540              :                 errmsg("could not receive list of publications from the publisher: %s",
     541              :                        res->err));
     542              : 
     543          134 :     publicationsCopy = list_copy(publications);
     544              : 
     545              :     /* Process publication(s). */
     546          134 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     547          297 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     548              :     {
     549              :         char       *pubname;
     550              :         bool        isnull;
     551              : 
     552          163 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     553              :         Assert(!isnull);
     554              : 
     555              :         /* Delete the publication present in publisher from the list. */
     556          163 :         publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
     557          163 :         ExecClearTuple(slot);
     558              :     }
     559              : 
     560          134 :     ExecDropSingleTupleTableSlot(slot);
     561              : 
     562          134 :     walrcv_clear_result(res);
     563              : 
     564          134 :     if (list_length(publicationsCopy))
     565              :     {
     566              :         /* Prepare the list of non-existent publication(s) for error message. */
     567              :         StringInfoData pubnames;
     568              : 
     569            4 :         initStringInfo(&pubnames);
     570              : 
     571            4 :         GetPublicationsStr(publicationsCopy, &pubnames, false);
     572            4 :         ereport(WARNING,
     573              :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     574              :                 errmsg_plural("publication %s does not exist on the publisher",
     575              :                               "publications %s do not exist on the publisher",
     576              :                               list_length(publicationsCopy),
     577              :                               pubnames.data));
     578              :     }
     579          134 : }
     580              : 
     581              : /*
     582              :  * Auxiliary function to build a text array out of a list of String nodes.
     583              :  */
     584              : static Datum
     585          207 : publicationListToArray(List *publist)
     586              : {
     587              :     ArrayType  *arr;
     588              :     Datum      *datums;
     589              :     MemoryContext memcxt;
     590              :     MemoryContext oldcxt;
     591              : 
     592              :     /* Create memory context for temporary allocations. */
     593          207 :     memcxt = AllocSetContextCreate(CurrentMemoryContext,
     594              :                                    "publicationListToArray to array",
     595              :                                    ALLOCSET_DEFAULT_SIZES);
     596          207 :     oldcxt = MemoryContextSwitchTo(memcxt);
     597              : 
     598          207 :     datums = palloc_array(Datum, list_length(publist));
     599              : 
     600          207 :     check_duplicates_in_publist(publist, datums);
     601              : 
     602          204 :     MemoryContextSwitchTo(oldcxt);
     603              : 
     604          204 :     arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
     605              : 
     606          204 :     MemoryContextDelete(memcxt);
     607              : 
     608          204 :     return PointerGetDatum(arr);
     609              : }
     610              : 
     611              : /*
     612              :  * Create new subscription.
     613              :  */
     614              : ObjectAddress
     615          261 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
     616              :                    bool isTopLevel)
     617              : {
     618              :     Relation    rel;
     619              :     ObjectAddress myself;
     620              :     Oid         subid;
     621              :     bool        nulls[Natts_pg_subscription];
     622              :     Datum       values[Natts_pg_subscription];
     623          261 :     Oid         owner = GetUserId();
     624              :     HeapTuple   tup;
     625              :     Oid         serverid;
     626              :     char       *conninfo;
     627              :     char        originname[NAMEDATALEN];
     628              :     List       *publications;
     629              :     bits32      supported_opts;
     630          261 :     SubOpts     opts = {0};
     631              :     AclResult   aclresult;
     632              : 
     633              :     /*
     634              :      * Parse and check options.
     635              :      *
     636              :      * Connection and publication should not be specified here.
     637              :      */
     638          261 :     supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     639              :                       SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
     640              :                       SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
     641              :                       SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
     642              :                       SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
     643              :                       SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
     644              :                       SUBOPT_RETAIN_DEAD_TUPLES |
     645              :                       SUBOPT_MAX_RETENTION_DURATION |
     646              :                       SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN);
     647          261 :     parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
     648              : 
     649              :     /*
     650              :      * Since creating a replication slot is not transactional, rolling back
     651              :      * the transaction leaves the created replication slot.  So we cannot run
     652              :      * CREATE SUBSCRIPTION inside a transaction block if creating a
     653              :      * replication slot.
     654              :      */
     655          216 :     if (opts.create_slot)
     656          133 :         PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
     657              : 
     658              :     /*
     659              :      * We don't want to allow unprivileged users to be able to trigger
     660              :      * attempts to access arbitrary network destinations, so require the user
     661              :      * to have been specifically authorized to create subscriptions.
     662              :      */
     663          213 :     if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
     664            3 :         ereport(ERROR,
     665              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     666              :                  errmsg("permission denied to create subscription"),
     667              :                  errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
     668              :                            "pg_create_subscription")));
     669              : 
     670              :     /*
     671              :      * Since a subscription is a database object, we also check for CREATE
     672              :      * permission on the database.
     673              :      */
     674          210 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
     675              :                                 owner, ACL_CREATE);
     676          210 :     if (aclresult != ACLCHECK_OK)
     677            6 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     678            3 :                        get_database_name(MyDatabaseId));
     679              : 
     680              :     /*
     681              :      * Non-superusers are required to set a password for authentication, and
     682              :      * that password must be used by the target server, but the superuser can
     683              :      * exempt a subscription from this requirement.
     684              :      */
     685          207 :     if (!opts.passwordrequired && !superuser_arg(owner))
     686            3 :         ereport(ERROR,
     687              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     688              :                  errmsg("password_required=false is superuser-only"),
     689              :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
     690              : 
     691              :     /*
     692              :      * If built with appropriate switch, whine when regression-testing
     693              :      * conventions for subscription names are violated.
     694              :      */
     695              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
     696              :     if (strncmp(stmt->subname, "regress_", 8) != 0)
     697              :         elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
     698              : #endif
     699              : 
     700          204 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     701              : 
     702              :     /* Check if name is used */
     703          204 :     subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
     704              :                             ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
     705          204 :     if (OidIsValid(subid))
     706              :     {
     707            4 :         ereport(ERROR,
     708              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     709              :                  errmsg("subscription \"%s\" already exists",
     710              :                         stmt->subname)));
     711              :     }
     712              : 
     713              :     /*
     714              :      * Ensure that system configuration parameters are set appropriately to
     715              :      * support retain_dead_tuples and max_retention_duration.
     716              :      */
     717          200 :     CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
     718          200 :                                opts.retaindeadtuples, opts.retaindeadtuples,
     719          200 :                                (opts.maxretention > 0));
     720              : 
     721          200 :     if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
     722          165 :         opts.slot_name == NULL)
     723          165 :         opts.slot_name = stmt->subname;
     724              : 
     725              :     /* The default for synchronous_commit of subscriptions is off. */
     726          200 :     if (opts.synchronous_commit == NULL)
     727          200 :         opts.synchronous_commit = "off";
     728              : 
     729              :     /*
     730              :      * The default for wal_receiver_timeout of subscriptions is -1, which
     731              :      * means the value is inherited from the server configuration, command
     732              :      * line, or role/database settings.
     733              :      */
     734          200 :     if (opts.wal_receiver_timeout == NULL)
     735          200 :         opts.wal_receiver_timeout = "-1";
     736              : 
     737              :     /* Load the library providing us libpq calls. */
     738          200 :     load_file("libpqwalreceiver", false);
     739              : 
     740          200 :     if (stmt->servername)
     741              :     {
     742              :         ForeignServer *server;
     743              : 
     744              :         Assert(!stmt->conninfo);
     745           14 :         conninfo = NULL;
     746              : 
     747           14 :         server = GetForeignServerByName(stmt->servername, false);
     748           14 :         aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
     749           14 :         if (aclresult != ACLCHECK_OK)
     750            3 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
     751              : 
     752              :         /* make sure a user mapping exists */
     753           11 :         GetUserMapping(owner, server->serverid);
     754              : 
     755            8 :         serverid = server->serverid;
     756            8 :         conninfo = ForeignServerConnectionString(owner, serverid);
     757              :     }
     758              :     else
     759              :     {
     760              :         Assert(stmt->conninfo);
     761              : 
     762          186 :         serverid = InvalidOid;
     763          186 :         conninfo = stmt->conninfo;
     764              :     }
     765              : 
     766              :     /* Check the connection info string. */
     767          191 :     walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
     768              : 
     769          182 :     publications = stmt->publication;
     770              : 
     771              :     /* Everything ok, form a new tuple. */
     772          182 :     memset(values, 0, sizeof(values));
     773          182 :     memset(nulls, false, sizeof(nulls));
     774              : 
     775          182 :     subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
     776              :                                Anum_pg_subscription_oid);
     777          182 :     values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
     778          182 :     values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
     779          182 :     values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
     780          182 :     values[Anum_pg_subscription_subname - 1] =
     781          182 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
     782          182 :     values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
     783          182 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
     784          182 :     values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
     785          182 :     values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
     786          182 :     values[Anum_pg_subscription_subtwophasestate - 1] =
     787          182 :         CharGetDatum(opts.twophase ?
     788              :                      LOGICALREP_TWOPHASE_STATE_PENDING :
     789              :                      LOGICALREP_TWOPHASE_STATE_DISABLED);
     790          182 :     values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
     791          182 :     values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
     792          182 :     values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
     793          182 :     values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
     794          182 :     values[Anum_pg_subscription_subretaindeadtuples - 1] =
     795          182 :         BoolGetDatum(opts.retaindeadtuples);
     796          182 :     values[Anum_pg_subscription_submaxretention - 1] =
     797          182 :         Int32GetDatum(opts.maxretention);
     798          182 :     values[Anum_pg_subscription_subretentionactive - 1] =
     799          182 :         Int32GetDatum(opts.retaindeadtuples);
     800          182 :     values[Anum_pg_subscription_subserver - 1] = serverid;
     801          182 :     if (!OidIsValid(serverid))
     802          177 :         values[Anum_pg_subscription_subconninfo - 1] =
     803          177 :             CStringGetTextDatum(conninfo);
     804              :     else
     805            5 :         nulls[Anum_pg_subscription_subconninfo - 1] = true;
     806          182 :     if (opts.slot_name)
     807          171 :         values[Anum_pg_subscription_subslotname - 1] =
     808          171 :             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
     809              :     else
     810           11 :         nulls[Anum_pg_subscription_subslotname - 1] = true;
     811          182 :     values[Anum_pg_subscription_subsynccommit - 1] =
     812          182 :         CStringGetTextDatum(opts.synchronous_commit);
     813          182 :     values[Anum_pg_subscription_subwalrcvtimeout - 1] =
     814          182 :         CStringGetTextDatum(opts.wal_receiver_timeout);
     815          179 :     values[Anum_pg_subscription_subpublications - 1] =
     816          182 :         publicationListToArray(publications);
     817          179 :     values[Anum_pg_subscription_suborigin - 1] =
     818          179 :         CStringGetTextDatum(opts.origin);
     819              : 
     820          179 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     821              : 
     822              :     /* Insert tuple into catalog. */
     823          179 :     CatalogTupleInsert(rel, tup);
     824          179 :     heap_freetuple(tup);
     825              : 
     826          179 :     recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
     827              : 
     828          179 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     829              : 
     830          179 :     if (stmt->servername)
     831              :     {
     832              :         ObjectAddress referenced;
     833              : 
     834              :         Assert(OidIsValid(serverid));
     835              : 
     836            5 :         ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
     837            5 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     838              :     }
     839              : 
     840              :     /*
     841              :      * A replication origin is currently created for all subscriptions,
     842              :      * including those that only contain sequences or are otherwise empty.
     843              :      *
     844              :      * XXX: While this is technically unnecessary, optimizing it would require
     845              :      * additional logic to skip origin creation during DDL operations and
     846              :      * apply workers initialization, and to handle origin creation dynamically
     847              :      * when tables are added to the subscription. It is not clear whether
     848              :      * preventing creation of origins is worth additional complexity.
     849              :      */
     850          179 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
     851          179 :     replorigin_create(originname);
     852              : 
     853              :     /*
     854              :      * Connect to remote side to execute requested commands and fetch table
     855              :      * and sequence info.
     856              :      */
     857          179 :     if (opts.connect)
     858              :     {
     859              :         char       *err;
     860              :         WalReceiverConn *wrconn;
     861              :         bool        must_use_password;
     862              : 
     863              :         /* Try to connect to the publisher. */
     864          128 :         must_use_password = !superuser_arg(owner) && opts.passwordrequired;
     865          128 :         wrconn = walrcv_connect(conninfo, true, true, must_use_password,
     866              :                                 stmt->subname, &err);
     867          128 :         if (!wrconn)
     868            3 :             ereport(ERROR,
     869              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     870              :                      errmsg("subscription \"%s\" could not connect to the publisher: %s",
     871              :                             stmt->subname, err)));
     872              : 
     873          125 :         PG_TRY();
     874              :         {
     875          125 :             bool        has_tables = false;
     876              :             List       *pubrels;
     877              :             char        relation_state;
     878              : 
     879          125 :             check_publications(wrconn, publications);
     880          125 :             check_publications_origin_tables(wrconn, publications,
     881          125 :                                              opts.copy_data,
     882          125 :                                              opts.retaindeadtuples, opts.origin,
     883              :                                              NULL, 0, stmt->subname);
     884          125 :             check_publications_origin_sequences(wrconn, publications,
     885          125 :                                                 opts.copy_data, opts.origin,
     886              :                                                 NULL, 0, stmt->subname);
     887              : 
     888          125 :             if (opts.retaindeadtuples)
     889            3 :                 check_pub_dead_tuple_retention(wrconn);
     890              : 
     891              :             /*
     892              :              * Set sync state based on if we were asked to do data copy or
     893              :              * not.
     894              :              */
     895          125 :             relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
     896              : 
     897              :             /*
     898              :              * Build local relation status info. Relations are for both tables
     899              :              * and sequences from the publisher.
     900              :              */
     901          125 :             pubrels = fetch_relation_list(wrconn, publications);
     902              : 
     903          442 :             foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
     904              :             {
     905              :                 Oid         relid;
     906              :                 char        relkind;
     907          194 :                 RangeVar   *rv = pubrelinfo->rv;
     908              : 
     909          194 :                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
     910          194 :                 relkind = get_rel_relkind(relid);
     911              : 
     912              :                 /* Check for supported relkind. */
     913          194 :                 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
     914          194 :                                          rv->schemaname, rv->relname);
     915          194 :                 has_tables |= (relkind != RELKIND_SEQUENCE);
     916          194 :                 AddSubscriptionRelState(subid, relid, relation_state,
     917              :                                         InvalidXLogRecPtr, true);
     918              :             }
     919              : 
     920              :             /*
     921              :              * If requested, create permanent slot for the subscription. We
     922              :              * won't use the initial snapshot for anything, so no need to
     923              :              * export it.
     924              :              *
     925              :              * XXX: Similar to origins, it is not clear whether preventing the
     926              :              * slot creation for empty and sequence-only subscriptions is
     927              :              * worth additional complexity.
     928              :              */
     929          124 :             if (opts.create_slot)
     930              :             {
     931          119 :                 bool        twophase_enabled = false;
     932              : 
     933              :                 Assert(opts.slot_name);
     934              : 
     935              :                 /*
     936              :                  * Even if two_phase is set, don't create the slot with
     937              :                  * two-phase enabled. Will enable it once all the tables are
     938              :                  * synced and ready. This avoids race-conditions like prepared
     939              :                  * transactions being skipped due to changes not being applied
     940              :                  * due to checks in should_apply_changes_for_rel() when
     941              :                  * tablesync for the corresponding tables are in progress. See
     942              :                  * comments atop worker.c.
     943              :                  *
     944              :                  * Note that if tables were specified but copy_data is false
     945              :                  * then it is safe to enable two_phase up-front because those
     946              :                  * tables are already initially in READY state. When the
     947              :                  * subscription has no tables, we leave the twophase state as
     948              :                  * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
     949              :                  * PUBLICATION to work.
     950              :                  */
     951          119 :                 if (opts.twophase && !opts.copy_data && has_tables)
     952            1 :                     twophase_enabled = true;
     953              : 
     954          119 :                 walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
     955              :                                    opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
     956              : 
     957          119 :                 if (twophase_enabled)
     958            1 :                     UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
     959              : 
     960          119 :                 ereport(NOTICE,
     961              :                         (errmsg("created replication slot \"%s\" on publisher",
     962              :                                 opts.slot_name)));
     963              :             }
     964              :         }
     965            1 :         PG_FINALLY();
     966              :         {
     967          125 :             walrcv_disconnect(wrconn);
     968              :         }
     969          125 :         PG_END_TRY();
     970              :     }
     971              :     else
     972           51 :         ereport(WARNING,
     973              :                 (errmsg("subscription was created, but is not connected"),
     974              :                  errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
     975              : 
     976          175 :     table_close(rel, RowExclusiveLock);
     977              : 
     978          175 :     pgstat_create_subscription(subid);
     979              : 
     980              :     /*
     981              :      * Notify the launcher to start the apply worker if the subscription is
     982              :      * enabled, or to create the conflict detection slot if retain_dead_tuples
     983              :      * is enabled.
     984              :      *
     985              :      * Creating the conflict detection slot is essential even when the
     986              :      * subscription is not enabled. This ensures that dead tuples are
     987              :      * retained, which is necessary for accurately identifying the type of
     988              :      * conflict during replication.
     989              :      */
     990          175 :     if (opts.enabled || opts.retaindeadtuples)
     991          118 :         ApplyLauncherWakeupAtCommit();
     992              : 
     993          175 :     InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
     994              : 
     995          175 :     return myself;
     996              : }
     997              : 
     998              : static void
     999           38 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
    1000              :                           List *validate_publications)
    1001              : {
    1002              :     char       *err;
    1003           38 :     List       *pubrels = NIL;
    1004              :     Oid        *pubrel_local_oids;
    1005              :     List       *subrel_states;
    1006           38 :     List       *sub_remove_rels = NIL;
    1007              :     Oid        *subrel_local_oids;
    1008              :     Oid        *subseq_local_oids;
    1009              :     int         subrel_count;
    1010              :     ListCell   *lc;
    1011              :     int         off;
    1012           38 :     int         tbl_count = 0;
    1013           38 :     int         seq_count = 0;
    1014           38 :     Relation    rel = NULL;
    1015              :     typedef struct SubRemoveRels
    1016              :     {
    1017              :         Oid         relid;
    1018              :         char        state;
    1019              :     } SubRemoveRels;
    1020              : 
    1021              :     WalReceiverConn *wrconn;
    1022              :     bool        must_use_password;
    1023              : 
    1024              :     /* Load the library providing us libpq calls. */
    1025           38 :     load_file("libpqwalreceiver", false);
    1026              : 
    1027              :     /* Try to connect to the publisher. */
    1028           38 :     must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1029           38 :     wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
    1030              :                             sub->name, &err);
    1031           37 :     if (!wrconn)
    1032            0 :         ereport(ERROR,
    1033              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1034              :                  errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1035              :                         sub->name, err)));
    1036              : 
    1037           37 :     PG_TRY();
    1038              :     {
    1039           37 :         if (validate_publications)
    1040            9 :             check_publications(wrconn, validate_publications);
    1041              : 
    1042              :         /* Get the relation list from publisher. */
    1043           37 :         pubrels = fetch_relation_list(wrconn, sub->publications);
    1044              : 
    1045              :         /* Get local relation list. */
    1046           37 :         subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
    1047           37 :         subrel_count = list_length(subrel_states);
    1048              : 
    1049              :         /*
    1050              :          * Build qsorted arrays of local table oids and sequence oids for
    1051              :          * faster lookup. This can potentially contain all tables and
    1052              :          * sequences in the database so speed of lookup is important.
    1053              :          *
    1054              :          * We do not yet know the exact count of tables and sequences, so we
    1055              :          * allocate separate arrays for table OIDs and sequence OIDs based on
    1056              :          * the total number of relations (subrel_count).
    1057              :          */
    1058           37 :         subrel_local_oids = palloc(subrel_count * sizeof(Oid));
    1059           37 :         subseq_local_oids = palloc(subrel_count * sizeof(Oid));
    1060          134 :         foreach(lc, subrel_states)
    1061              :         {
    1062           97 :             SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
    1063              : 
    1064           97 :             if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
    1065            9 :                 subseq_local_oids[seq_count++] = relstate->relid;
    1066              :             else
    1067           88 :                 subrel_local_oids[tbl_count++] = relstate->relid;
    1068              :         }
    1069              : 
    1070           37 :         qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
    1071           37 :         check_publications_origin_tables(wrconn, sub->publications, copy_data,
    1072           37 :                                          sub->retaindeadtuples, sub->origin,
    1073              :                                          subrel_local_oids, tbl_count,
    1074              :                                          sub->name);
    1075              : 
    1076           37 :         qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
    1077           37 :         check_publications_origin_sequences(wrconn, sub->publications,
    1078              :                                             copy_data, sub->origin,
    1079              :                                             subseq_local_oids, seq_count,
    1080              :                                             sub->name);
    1081              : 
    1082              :         /*
    1083              :          * Walk over the remote relations and try to match them to locally
    1084              :          * known relations. If the relation is not known locally create a new
    1085              :          * state for it.
    1086              :          *
    1087              :          * Also builds array of local oids of remote relations for the next
    1088              :          * step.
    1089              :          */
    1090           37 :         off = 0;
    1091           37 :         pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
    1092              : 
    1093          176 :         foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
    1094              :         {
    1095          102 :             RangeVar   *rv = pubrelinfo->rv;
    1096              :             Oid         relid;
    1097              :             char        relkind;
    1098              : 
    1099          102 :             relid = RangeVarGetRelid(rv, AccessShareLock, false);
    1100          102 :             relkind = get_rel_relkind(relid);
    1101              : 
    1102              :             /* Check for supported relkind. */
    1103          102 :             CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
    1104          102 :                                      rv->schemaname, rv->relname);
    1105              : 
    1106          102 :             pubrel_local_oids[off++] = relid;
    1107              : 
    1108          102 :             if (!bsearch(&relid, subrel_local_oids,
    1109           35 :                          tbl_count, sizeof(Oid), oid_cmp) &&
    1110           35 :                 !bsearch(&relid, subseq_local_oids,
    1111              :                          seq_count, sizeof(Oid), oid_cmp))
    1112              :             {
    1113           26 :                 AddSubscriptionRelState(sub->oid, relid,
    1114              :                                         copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
    1115              :                                         InvalidXLogRecPtr, true);
    1116           26 :                 ereport(DEBUG1,
    1117              :                         errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
    1118              :                                         relkind == RELKIND_SEQUENCE ? "sequence" : "table",
    1119              :                                         rv->schemaname, rv->relname, sub->name));
    1120              :             }
    1121              :         }
    1122              : 
    1123              :         /*
    1124              :          * Next remove state for tables we should not care about anymore using
    1125              :          * the data we collected above
    1126              :          */
    1127           37 :         qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
    1128              : 
    1129          125 :         for (off = 0; off < tbl_count; off++)
    1130              :         {
    1131           88 :             Oid         relid = subrel_local_oids[off];
    1132              : 
    1133           88 :             if (!bsearch(&relid, pubrel_local_oids,
    1134           88 :                          list_length(pubrels), sizeof(Oid), oid_cmp))
    1135              :             {
    1136              :                 char        state;
    1137              :                 XLogRecPtr  statelsn;
    1138           21 :                 SubRemoveRels *remove_rel = palloc_object(SubRemoveRels);
    1139              : 
    1140              :                 /*
    1141              :                  * Lock pg_subscription_rel with AccessExclusiveLock to
    1142              :                  * prevent any race conditions with the apply worker
    1143              :                  * re-launching workers at the same time this code is trying
    1144              :                  * to remove those tables.
    1145              :                  *
    1146              :                  * Even if new worker for this particular rel is restarted it
    1147              :                  * won't be able to make any progress as we hold exclusive
    1148              :                  * lock on pg_subscription_rel till the transaction end. It
    1149              :                  * will simply exit as there is no corresponding rel entry.
    1150              :                  *
    1151              :                  * This locking also ensures that the state of rels won't
    1152              :                  * change till we are done with this refresh operation.
    1153              :                  */
    1154           21 :                 if (!rel)
    1155            9 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1156              : 
    1157              :                 /* Last known rel state. */
    1158           21 :                 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
    1159              : 
    1160           21 :                 RemoveSubscriptionRel(sub->oid, relid);
    1161              : 
    1162           21 :                 remove_rel->relid = relid;
    1163           21 :                 remove_rel->state = state;
    1164              : 
    1165           21 :                 sub_remove_rels = lappend(sub_remove_rels, remove_rel);
    1166              : 
    1167           21 :                 logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
    1168              : 
    1169              :                 /*
    1170              :                  * For READY state, we would have already dropped the
    1171              :                  * tablesync origin.
    1172              :                  */
    1173           21 :                 if (state != SUBREL_STATE_READY)
    1174              :                 {
    1175              :                     char        originname[NAMEDATALEN];
    1176              : 
    1177              :                     /*
    1178              :                      * Drop the tablesync's origin tracking if exists.
    1179              :                      *
    1180              :                      * It is possible that the origin is not yet created for
    1181              :                      * tablesync worker, this can happen for the states before
    1182              :                      * SUBREL_STATE_DATASYNC. The tablesync worker or apply
    1183              :                      * worker can also concurrently try to drop the origin and
    1184              :                      * by this time the origin might be already removed. For
    1185              :                      * these reasons, passing missing_ok = true.
    1186              :                      */
    1187            0 :                     ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
    1188              :                                                        sizeof(originname));
    1189            0 :                     replorigin_drop_by_name(originname, true, false);
    1190              :                 }
    1191              : 
    1192           21 :                 ereport(DEBUG1,
    1193              :                         (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
    1194              :                                          get_namespace_name(get_rel_namespace(relid)),
    1195              :                                          get_rel_name(relid),
    1196              :                                          sub->name)));
    1197              :             }
    1198              :         }
    1199              : 
    1200              :         /*
    1201              :          * Drop the tablesync slots associated with removed tables. This has
    1202              :          * to be at the end because otherwise if there is an error while doing
    1203              :          * the database operations we won't be able to rollback dropped slots.
    1204              :          */
    1205           95 :         foreach_ptr(SubRemoveRels, sub_remove_rel, sub_remove_rels)
    1206              :         {
    1207           21 :             if (sub_remove_rel->state != SUBREL_STATE_READY &&
    1208            0 :                 sub_remove_rel->state != SUBREL_STATE_SYNCDONE)
    1209              :             {
    1210            0 :                 char        syncslotname[NAMEDATALEN] = {0};
    1211              : 
    1212              :                 /*
    1213              :                  * For READY/SYNCDONE states we know the tablesync slot has
    1214              :                  * already been dropped by the tablesync worker.
    1215              :                  *
    1216              :                  * For other states, there is no certainty, maybe the slot
    1217              :                  * does not exist yet. Also, if we fail after removing some of
    1218              :                  * the slots, next time, it will again try to drop already
    1219              :                  * dropped slots and fail. For these reasons, we allow
    1220              :                  * missing_ok = true for the drop.
    1221              :                  */
    1222            0 :                 ReplicationSlotNameForTablesync(sub->oid, sub_remove_rel->relid,
    1223              :                                                 syncslotname, sizeof(syncslotname));
    1224            0 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    1225              :             }
    1226              :         }
    1227              : 
    1228              :         /*
    1229              :          * Next remove state for sequences we should not care about anymore
    1230              :          * using the data we collected above
    1231              :          */
    1232           46 :         for (off = 0; off < seq_count; off++)
    1233              :         {
    1234            9 :             Oid         relid = subseq_local_oids[off];
    1235              : 
    1236            9 :             if (!bsearch(&relid, pubrel_local_oids,
    1237            9 :                          list_length(pubrels), sizeof(Oid), oid_cmp))
    1238              :             {
    1239              :                 /*
    1240              :                  * This locking ensures that the state of rels won't change
    1241              :                  * till we are done with this refresh operation.
    1242              :                  */
    1243            0 :                 if (!rel)
    1244            0 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1245              : 
    1246            0 :                 RemoveSubscriptionRel(sub->oid, relid);
    1247              : 
    1248            0 :                 ereport(DEBUG1,
    1249              :                         errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
    1250              :                                         get_namespace_name(get_rel_namespace(relid)),
    1251              :                                         get_rel_name(relid),
    1252              :                                         sub->name));
    1253              :             }
    1254              :         }
    1255              :     }
    1256            0 :     PG_FINALLY();
    1257              :     {
    1258           37 :         walrcv_disconnect(wrconn);
    1259              :     }
    1260           37 :     PG_END_TRY();
    1261              : 
    1262           37 :     if (rel)
    1263            9 :         table_close(rel, NoLock);
    1264           37 : }
    1265              : 
    1266              : /*
    1267              :  * Marks all sequences with INIT state.
    1268              :  */
    1269              : static void
    1270            1 : AlterSubscription_refresh_seq(Subscription *sub)
    1271              : {
    1272            1 :     char       *err = NULL;
    1273              :     WalReceiverConn *wrconn;
    1274              :     bool        must_use_password;
    1275              : 
    1276              :     /* Load the library providing us libpq calls. */
    1277            1 :     load_file("libpqwalreceiver", false);
    1278              : 
    1279              :     /* Try to connect to the publisher. */
    1280            1 :     must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1281            1 :     wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
    1282              :                             sub->name, &err);
    1283            1 :     if (!wrconn)
    1284            0 :         ereport(ERROR,
    1285              :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1286              :                 errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1287              :                        sub->name, err));
    1288              : 
    1289            1 :     PG_TRY();
    1290              :     {
    1291              :         List       *subrel_states;
    1292              : 
    1293            1 :         check_publications_origin_sequences(wrconn, sub->publications, true,
    1294              :                                             sub->origin, NULL, 0, sub->name);
    1295              : 
    1296              :         /* Get local sequence list. */
    1297            1 :         subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
    1298            5 :         foreach_ptr(SubscriptionRelState, subrel, subrel_states)
    1299              :         {
    1300            3 :             Oid         relid = subrel->relid;
    1301              : 
    1302            3 :             UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
    1303              :                                        InvalidXLogRecPtr, false);
    1304            3 :             ereport(DEBUG1,
    1305              :                     errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
    1306              :                                     get_namespace_name(get_rel_namespace(relid)),
    1307              :                                     get_rel_name(relid),
    1308              :                                     sub->name));
    1309              :         }
    1310              :     }
    1311            0 :     PG_FINALLY();
    1312              :     {
    1313            1 :         walrcv_disconnect(wrconn);
    1314              :     }
    1315            1 :     PG_END_TRY();
    1316            1 : }
    1317              : 
    1318              : /*
    1319              :  * Common checks for altering failover, two_phase, and retain_dead_tuples
    1320              :  * options.
    1321              :  */
    1322              : static void
    1323           13 : CheckAlterSubOption(Subscription *sub, const char *option,
    1324              :                     bool slot_needs_update, bool isTopLevel)
    1325              : {
    1326              :     Assert(strcmp(option, "failover") == 0 ||
    1327              :            strcmp(option, "two_phase") == 0 ||
    1328              :            strcmp(option, "retain_dead_tuples") == 0);
    1329              : 
    1330              :     /*
    1331              :      * Altering the retain_dead_tuples option does not update the slot on the
    1332              :      * publisher.
    1333              :      */
    1334              :     Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
    1335              : 
    1336              :     /*
    1337              :      * Do not allow changing the option if the subscription is enabled. This
    1338              :      * is because both failover and two_phase options of the slot on the
    1339              :      * publisher cannot be modified if the slot is currently acquired by the
    1340              :      * existing walsender.
    1341              :      *
    1342              :      * Note that two_phase is enabled (aka changed from 'false' to 'true') on
    1343              :      * the publisher by the existing walsender, so we could have allowed that
    1344              :      * even when the subscription is enabled. But we kept this restriction for
    1345              :      * the sake of consistency and simplicity.
    1346              :      *
    1347              :      * Additionally, do not allow changing the retain_dead_tuples option when
    1348              :      * the subscription is enabled to prevent race conditions arising from the
    1349              :      * new option value being acknowledged asynchronously by the launcher and
    1350              :      * apply workers.
    1351              :      *
    1352              :      * Without the restriction, a race condition may arise when a user
    1353              :      * disables and immediately re-enables the retain_dead_tuples option. In
    1354              :      * this case, the launcher might drop the slot upon noticing the disabled
    1355              :      * action, while the apply worker may keep maintaining
    1356              :      * oldest_nonremovable_xid without noticing the option change. During this
    1357              :      * period, a transaction ID wraparound could falsely make this ID appear
    1358              :      * as if it originates from the future w.r.t the transaction ID stored in
    1359              :      * the slot maintained by launcher.
    1360              :      *
    1361              :      * Similarly, if the user enables retain_dead_tuples concurrently with the
    1362              :      * launcher starting the worker, the apply worker may start calculating
    1363              :      * oldest_nonremovable_xid before the launcher notices the enable action.
    1364              :      * Consequently, the launcher may update slot.xmin to a newer value than
    1365              :      * that maintained by the worker. In subsequent cycles, upon integrating
    1366              :      * the worker's oldest_nonremovable_xid, the launcher might detect a
    1367              :      * retreat in the calculated xmin, necessitating additional handling.
    1368              :      *
    1369              :      * XXX To address the above race conditions, we can define
    1370              :      * oldest_nonremovable_xid as FullTransactionId and adds the check to
    1371              :      * disallow retreating the conflict slot's xmin. For now, we kept the
    1372              :      * implementation simple by disallowing change to the retain_dead_tuples,
    1373              :      * but in the future we can change this after some more analysis.
    1374              :      *
    1375              :      * Note that we could restrict only the enabling of retain_dead_tuples to
    1376              :      * avoid the race conditions described above, but we maintain the
    1377              :      * restriction for both enable and disable operations for the sake of
    1378              :      * consistency.
    1379              :      */
    1380           13 :     if (sub->enabled)
    1381            2 :         ereport(ERROR,
    1382              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1383              :                  errmsg("cannot set option \"%s\" for enabled subscription",
    1384              :                         option)));
    1385              : 
    1386           11 :     if (slot_needs_update)
    1387              :     {
    1388              :         StringInfoData cmd;
    1389              : 
    1390              :         /*
    1391              :          * A valid slot must be associated with the subscription for us to
    1392              :          * modify any of the slot's properties.
    1393              :          */
    1394            8 :         if (!sub->slotname)
    1395            0 :             ereport(ERROR,
    1396              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1397              :                      errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
    1398              :                             option)));
    1399              : 
    1400              :         /* The changed option of the slot can't be rolled back. */
    1401            8 :         initStringInfo(&cmd);
    1402            8 :         appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
    1403              : 
    1404            8 :         PreventInTransactionBlock(isTopLevel, cmd.data);
    1405            5 :         pfree(cmd.data);
    1406              :     }
    1407            8 : }
    1408              : 
    1409              : /*
    1410              :  * Alter the existing subscription.
    1411              :  */
    1412              : ObjectAddress
    1413          277 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
    1414              :                   bool isTopLevel)
    1415              : {
    1416              :     Relation    rel;
    1417              :     ObjectAddress myself;
    1418              :     bool        nulls[Natts_pg_subscription];
    1419              :     bool        replaces[Natts_pg_subscription];
    1420              :     Datum       values[Natts_pg_subscription];
    1421              :     HeapTuple   tup;
    1422              :     Oid         subid;
    1423          277 :     bool        update_tuple = false;
    1424          277 :     bool        update_failover = false;
    1425          277 :     bool        update_two_phase = false;
    1426          277 :     bool        check_pub_rdt = false;
    1427              :     bool        retain_dead_tuples;
    1428              :     int         max_retention;
    1429              :     bool        retention_active;
    1430              :     char       *origin;
    1431              :     Subscription *sub;
    1432              :     Form_pg_subscription form;
    1433              :     bits32      supported_opts;
    1434          277 :     SubOpts     opts = {0};
    1435              : 
    1436          277 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1437              : 
    1438              :     /* Fetch the existing tuple. */
    1439          277 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    1440              :                               CStringGetDatum(stmt->subname));
    1441              : 
    1442          277 :     if (!HeapTupleIsValid(tup))
    1443            3 :         ereport(ERROR,
    1444              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1445              :                  errmsg("subscription \"%s\" does not exist",
    1446              :                         stmt->subname)));
    1447              : 
    1448          274 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1449          274 :     subid = form->oid;
    1450              : 
    1451              :     /* must be owner */
    1452          274 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    1453            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    1454            0 :                        stmt->subname);
    1455              : 
    1456              :     /*
    1457              :      * Skip ACL checks on the subscription's foreign server, if any. If
    1458              :      * changing the server (or replacing it with a raw connection), then the
    1459              :      * old one will be removed anyway. If changing something unrelated,
    1460              :      * there's no need to do an additional ACL check here; that will be done
    1461              :      * by the subscription worker anyway.
    1462              :      */
    1463          274 :     sub = GetSubscription(subid, false, false);
    1464              : 
    1465          274 :     retain_dead_tuples = sub->retaindeadtuples;
    1466          274 :     origin = sub->origin;
    1467          274 :     max_retention = sub->maxretention;
    1468          274 :     retention_active = sub->retentionactive;
    1469              : 
    1470              :     /*
    1471              :      * Don't allow non-superuser modification of a subscription with
    1472              :      * password_required=false.
    1473              :      */
    1474          274 :     if (!sub->passwordrequired && !superuser())
    1475            0 :         ereport(ERROR,
    1476              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1477              :                  errmsg("password_required=false is superuser-only"),
    1478              :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1479              : 
    1480              :     /* Lock the subscription so nobody else can do anything with it. */
    1481          274 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    1482              : 
    1483              :     /* Form a new tuple. */
    1484          274 :     memset(values, 0, sizeof(values));
    1485          274 :     memset(nulls, false, sizeof(nulls));
    1486          274 :     memset(replaces, false, sizeof(replaces));
    1487              : 
    1488          274 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    1489              : 
    1490          274 :     switch (stmt->kind)
    1491              :     {
    1492          121 :         case ALTER_SUBSCRIPTION_OPTIONS:
    1493              :             {
    1494          121 :                 supported_opts = (SUBOPT_SLOT_NAME |
    1495              :                                   SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
    1496              :                                   SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
    1497              :                                   SUBOPT_DISABLE_ON_ERR |
    1498              :                                   SUBOPT_PASSWORD_REQUIRED |
    1499              :                                   SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
    1500              :                                   SUBOPT_RETAIN_DEAD_TUPLES |
    1501              :                                   SUBOPT_MAX_RETENTION_DURATION |
    1502              :                                   SUBOPT_WAL_RECEIVER_TIMEOUT |
    1503              :                                   SUBOPT_ORIGIN);
    1504              : 
    1505          121 :                 parse_subscription_options(pstate, stmt->options,
    1506              :                                            supported_opts, &opts);
    1507              : 
    1508          109 :                 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1509              :                 {
    1510              :                     /*
    1511              :                      * The subscription must be disabled to allow slot_name as
    1512              :                      * 'none', otherwise, the apply worker will repeatedly try
    1513              :                      * to stream the data using that slot_name which neither
    1514              :                      * exists on the publisher nor the user will be allowed to
    1515              :                      * create it.
    1516              :                      */
    1517           39 :                     if (sub->enabled && !opts.slot_name)
    1518            0 :                         ereport(ERROR,
    1519              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1520              :                                  errmsg("cannot set %s for enabled subscription",
    1521              :                                         "slot_name = NONE")));
    1522              : 
    1523           39 :                     if (opts.slot_name)
    1524            3 :                         values[Anum_pg_subscription_subslotname - 1] =
    1525            3 :                             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
    1526              :                     else
    1527           36 :                         nulls[Anum_pg_subscription_subslotname - 1] = true;
    1528           39 :                     replaces[Anum_pg_subscription_subslotname - 1] = true;
    1529              :                 }
    1530              : 
    1531          109 :                 if (opts.synchronous_commit)
    1532              :                 {
    1533            3 :                     values[Anum_pg_subscription_subsynccommit - 1] =
    1534            3 :                         CStringGetTextDatum(opts.synchronous_commit);
    1535            3 :                     replaces[Anum_pg_subscription_subsynccommit - 1] = true;
    1536              :                 }
    1537              : 
    1538          109 :                 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
    1539              :                 {
    1540            9 :                     values[Anum_pg_subscription_subbinary - 1] =
    1541            9 :                         BoolGetDatum(opts.binary);
    1542            9 :                     replaces[Anum_pg_subscription_subbinary - 1] = true;
    1543              :                 }
    1544              : 
    1545          109 :                 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
    1546              :                 {
    1547           15 :                     values[Anum_pg_subscription_substream - 1] =
    1548           15 :                         CharGetDatum(opts.streaming);
    1549           15 :                     replaces[Anum_pg_subscription_substream - 1] = true;
    1550              :                 }
    1551              : 
    1552          109 :                 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
    1553              :                 {
    1554              :                     values[Anum_pg_subscription_subdisableonerr - 1]
    1555            3 :                         = BoolGetDatum(opts.disableonerr);
    1556              :                     replaces[Anum_pg_subscription_subdisableonerr - 1]
    1557            3 :                         = true;
    1558              :                 }
    1559              : 
    1560          109 :                 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
    1561              :                 {
    1562              :                     /* Non-superuser may not disable password_required. */
    1563            6 :                     if (!opts.passwordrequired && !superuser())
    1564            0 :                         ereport(ERROR,
    1565              :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1566              :                                  errmsg("password_required=false is superuser-only"),
    1567              :                                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1568              : 
    1569              :                     values[Anum_pg_subscription_subpasswordrequired - 1]
    1570            6 :                         = BoolGetDatum(opts.passwordrequired);
    1571              :                     replaces[Anum_pg_subscription_subpasswordrequired - 1]
    1572            6 :                         = true;
    1573              :                 }
    1574              : 
    1575          109 :                 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
    1576              :                 {
    1577            7 :                     values[Anum_pg_subscription_subrunasowner - 1] =
    1578            7 :                         BoolGetDatum(opts.runasowner);
    1579            7 :                     replaces[Anum_pg_subscription_subrunasowner - 1] = true;
    1580              :                 }
    1581              : 
    1582          109 :                 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
    1583              :                 {
    1584              :                     /*
    1585              :                      * We need to update both the slot and the subscription
    1586              :                      * for the two_phase option. We can enable the two_phase
    1587              :                      * option for a slot only once the initial data
    1588              :                      * synchronization is done. This is to avoid missing some
    1589              :                      * data as explained in comments atop worker.c.
    1590              :                      */
    1591            3 :                     update_two_phase = !opts.twophase;
    1592              : 
    1593            3 :                     CheckAlterSubOption(sub, "two_phase", update_two_phase,
    1594              :                                         isTopLevel);
    1595              : 
    1596              :                     /*
    1597              :                      * Modifying the two_phase slot option requires a slot
    1598              :                      * lookup by slot name, so changing the slot name at the
    1599              :                      * same time is not allowed.
    1600              :                      */
    1601            3 :                     if (update_two_phase &&
    1602            1 :                         IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1603            0 :                         ereport(ERROR,
    1604              :                                 (errcode(ERRCODE_SYNTAX_ERROR),
    1605              :                                  errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
    1606              : 
    1607              :                     /*
    1608              :                      * Note that workers may still survive even if the
    1609              :                      * subscription has been disabled.
    1610              :                      *
    1611              :                      * Ensure workers have already been exited to avoid
    1612              :                      * getting prepared transactions while we are disabling
    1613              :                      * the two_phase option. Otherwise, the changes of an
    1614              :                      * already prepared transaction can be replicated again
    1615              :                      * along with its corresponding commit, leading to
    1616              :                      * duplicate data or errors.
    1617              :                      */
    1618            3 :                     if (logicalrep_workers_find(subid, true, true))
    1619            0 :                         ereport(ERROR,
    1620              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1621              :                                  errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
    1622              :                                  errhint("Try again after some time.")));
    1623              : 
    1624              :                     /*
    1625              :                      * two_phase cannot be disabled if there are any
    1626              :                      * uncommitted prepared transactions present otherwise it
    1627              :                      * can lead to duplicate data or errors as explained in
    1628              :                      * the comment above.
    1629              :                      */
    1630            3 :                     if (update_two_phase &&
    1631            1 :                         sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
    1632            1 :                         LookupGXactBySubid(subid))
    1633            0 :                         ereport(ERROR,
    1634              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1635              :                                  errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
    1636              :                                  errhint("Resolve these transactions and try again.")));
    1637              : 
    1638              :                     /* Change system catalog accordingly */
    1639            3 :                     values[Anum_pg_subscription_subtwophasestate - 1] =
    1640            3 :                         CharGetDatum(opts.twophase ?
    1641              :                                      LOGICALREP_TWOPHASE_STATE_PENDING :
    1642              :                                      LOGICALREP_TWOPHASE_STATE_DISABLED);
    1643            3 :                     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1644              :                 }
    1645              : 
    1646          109 :                 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
    1647              :                 {
    1648              :                     /*
    1649              :                      * Similar to the two_phase case above, we need to update
    1650              :                      * the failover option for both the slot and the
    1651              :                      * subscription.
    1652              :                      */
    1653            8 :                     update_failover = true;
    1654              : 
    1655            8 :                     CheckAlterSubOption(sub, "failover", update_failover,
    1656              :                                         isTopLevel);
    1657              : 
    1658            4 :                     values[Anum_pg_subscription_subfailover - 1] =
    1659            4 :                         BoolGetDatum(opts.failover);
    1660            4 :                     replaces[Anum_pg_subscription_subfailover - 1] = true;
    1661              :                 }
    1662              : 
    1663          105 :                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
    1664              :                 {
    1665            2 :                     values[Anum_pg_subscription_subretaindeadtuples - 1] =
    1666            2 :                         BoolGetDatum(opts.retaindeadtuples);
    1667            2 :                     replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
    1668              : 
    1669              :                     /*
    1670              :                      * Update the retention status only if there's a change in
    1671              :                      * the retain_dead_tuples option value.
    1672              :                      *
    1673              :                      * Automatically marking retention as active when
    1674              :                      * retain_dead_tuples is enabled may not always be ideal,
    1675              :                      * especially if retention was previously stopped and the
    1676              :                      * user toggles retain_dead_tuples without adjusting the
    1677              :                      * publisher workload. However, this behavior provides a
    1678              :                      * convenient way for users to manually refresh the
    1679              :                      * retention status. Since retention will be stopped again
    1680              :                      * unless the publisher workload is reduced, this approach
    1681              :                      * is acceptable for now.
    1682              :                      */
    1683            2 :                     if (opts.retaindeadtuples != sub->retaindeadtuples)
    1684              :                     {
    1685            2 :                         values[Anum_pg_subscription_subretentionactive - 1] =
    1686            2 :                             BoolGetDatum(opts.retaindeadtuples);
    1687            2 :                         replaces[Anum_pg_subscription_subretentionactive - 1] = true;
    1688              : 
    1689            2 :                         retention_active = opts.retaindeadtuples;
    1690              :                     }
    1691              : 
    1692            2 :                     CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
    1693              : 
    1694              :                     /*
    1695              :                      * Workers may continue running even after the
    1696              :                      * subscription has been disabled.
    1697              :                      *
    1698              :                      * To prevent race conditions (as described in
    1699              :                      * CheckAlterSubOption()), ensure that all worker
    1700              :                      * processes have already exited before proceeding.
    1701              :                      */
    1702            1 :                     if (logicalrep_workers_find(subid, true, true))
    1703            0 :                         ereport(ERROR,
    1704              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1705              :                                  errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
    1706              :                                  errhint("Try again after some time.")));
    1707              : 
    1708              :                     /*
    1709              :                      * Notify the launcher to manage the replication slot for
    1710              :                      * conflict detection. This ensures that replication slot
    1711              :                      * is efficiently handled (created, updated, or dropped)
    1712              :                      * in response to any configuration changes.
    1713              :                      */
    1714            1 :                     ApplyLauncherWakeupAtCommit();
    1715              : 
    1716            1 :                     check_pub_rdt = opts.retaindeadtuples;
    1717            1 :                     retain_dead_tuples = opts.retaindeadtuples;
    1718              :                 }
    1719              : 
    1720          104 :                 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1721              :                 {
    1722            5 :                     values[Anum_pg_subscription_submaxretention - 1] =
    1723            5 :                         Int32GetDatum(opts.maxretention);
    1724            5 :                     replaces[Anum_pg_subscription_submaxretention - 1] = true;
    1725              : 
    1726            5 :                     max_retention = opts.maxretention;
    1727              :                 }
    1728              : 
    1729              :                 /*
    1730              :                  * Ensure that system configuration parameters are set
    1731              :                  * appropriately to support retain_dead_tuples and
    1732              :                  * max_retention_duration.
    1733              :                  */
    1734          104 :                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
    1735          103 :                     IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1736            6 :                     CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
    1737              :                                                retain_dead_tuples,
    1738              :                                                retention_active,
    1739            6 :                                                (max_retention > 0));
    1740              : 
    1741          104 :                 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
    1742              :                 {
    1743            5 :                     values[Anum_pg_subscription_suborigin - 1] =
    1744            5 :                         CStringGetTextDatum(opts.origin);
    1745            5 :                     replaces[Anum_pg_subscription_suborigin - 1] = true;
    1746              : 
    1747              :                     /*
    1748              :                      * Check if changes from different origins may be received
    1749              :                      * from the publisher when the origin is changed to ANY
    1750              :                      * and retain_dead_tuples is enabled.
    1751              :                      */
    1752            7 :                     check_pub_rdt = retain_dead_tuples &&
    1753            2 :                         pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
    1754              : 
    1755            5 :                     origin = opts.origin;
    1756              :                 }
    1757              : 
    1758          104 :                 if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
    1759              :                 {
    1760            6 :                     values[Anum_pg_subscription_subwalrcvtimeout - 1] =
    1761            6 :                         CStringGetTextDatum(opts.wal_receiver_timeout);
    1762            6 :                     replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true;
    1763              :                 }
    1764              : 
    1765          104 :                 update_tuple = true;
    1766          104 :                 break;
    1767              :             }
    1768              : 
    1769           53 :         case ALTER_SUBSCRIPTION_ENABLED:
    1770              :             {
    1771           53 :                 parse_subscription_options(pstate, stmt->options,
    1772              :                                            SUBOPT_ENABLED, &opts);
    1773              :                 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
    1774              : 
    1775           53 :                 if (!sub->slotname && opts.enabled)
    1776            3 :                     ereport(ERROR,
    1777              :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1778              :                              errmsg("cannot enable subscription that does not have a slot name")));
    1779              : 
    1780              :                 /*
    1781              :                  * Check track_commit_timestamp only when enabling the
    1782              :                  * subscription in case it was disabled after creation. See
    1783              :                  * comments atop CheckSubDeadTupleRetention() for details.
    1784              :                  */
    1785           50 :                 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
    1786           50 :                                            WARNING, sub->retaindeadtuples,
    1787           50 :                                            sub->retentionactive, false);
    1788              : 
    1789           50 :                 values[Anum_pg_subscription_subenabled - 1] =
    1790           50 :                     BoolGetDatum(opts.enabled);
    1791           50 :                 replaces[Anum_pg_subscription_subenabled - 1] = true;
    1792              : 
    1793           50 :                 if (opts.enabled)
    1794           28 :                     ApplyLauncherWakeupAtCommit();
    1795              : 
    1796           50 :                 update_tuple = true;
    1797              : 
    1798              :                 /*
    1799              :                  * The subscription might be initially created with
    1800              :                  * connect=false and retain_dead_tuples=true, meaning the
    1801              :                  * remote server's status may not be checked. Ensure this
    1802              :                  * check is conducted now.
    1803              :                  */
    1804           50 :                 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
    1805           50 :                 break;
    1806              :             }
    1807              : 
    1808            0 :         case ALTER_SUBSCRIPTION_SERVER:
    1809              :             {
    1810              :                 ForeignServer *new_server;
    1811              :                 ObjectAddress referenced;
    1812              :                 AclResult   aclresult;
    1813              :                 char       *conninfo;
    1814              : 
    1815              :                 /*
    1816              :                  * Remove what was there before, either another foreign server
    1817              :                  * or a connection string.
    1818              :                  */
    1819            0 :                 if (form->subserver)
    1820              :                 {
    1821            0 :                     deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
    1822              :                                                        DEPENDENCY_NORMAL,
    1823              :                                                        ForeignServerRelationId, form->subserver);
    1824              :                 }
    1825              :                 else
    1826              :                 {
    1827            0 :                     nulls[Anum_pg_subscription_subconninfo - 1] = true;
    1828            0 :                     replaces[Anum_pg_subscription_subconninfo - 1] = true;
    1829              :                 }
    1830              : 
    1831              :                 /*
    1832              :                  * Check that the subscription owner has USAGE privileges on
    1833              :                  * the server.
    1834              :                  */
    1835            0 :                 new_server = GetForeignServerByName(stmt->servername, false);
    1836            0 :                 aclresult = object_aclcheck(ForeignServerRelationId,
    1837              :                                             new_server->serverid,
    1838              :                                             form->subowner, ACL_USAGE);
    1839            0 :                 if (aclresult != ACLCHECK_OK)
    1840            0 :                     ereport(ERROR,
    1841              :                             errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1842              :                             errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
    1843              :                                    GetUserNameFromId(form->subowner, false),
    1844              :                                    ForeignServerName(new_server->serverid)));
    1845              : 
    1846              :                 /* make sure a user mapping exists */
    1847            0 :                 GetUserMapping(form->subowner, new_server->serverid);
    1848              : 
    1849            0 :                 conninfo = ForeignServerConnectionString(form->subowner,
    1850              :                                                          new_server->serverid);
    1851              : 
    1852              :                 /* Load the library providing us libpq calls. */
    1853            0 :                 load_file("libpqwalreceiver", false);
    1854              :                 /* Check the connection info string. */
    1855            0 :                 walrcv_check_conninfo(conninfo,
    1856              :                                       sub->passwordrequired && !sub->ownersuperuser);
    1857              : 
    1858            0 :                 values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
    1859            0 :                 replaces[Anum_pg_subscription_subserver - 1] = true;
    1860              : 
    1861            0 :                 ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
    1862            0 :                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
    1863              : 
    1864            0 :                 update_tuple = true;
    1865              :             }
    1866            0 :             break;
    1867              : 
    1868           10 :         case ALTER_SUBSCRIPTION_CONNECTION:
    1869              :             /* remove reference to foreign server and dependencies, if present */
    1870           10 :             if (form->subserver)
    1871              :             {
    1872            0 :                 deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
    1873              :                                                    DEPENDENCY_NORMAL,
    1874              :                                                    ForeignServerRelationId, form->subserver);
    1875              : 
    1876            0 :                 values[Anum_pg_subscription_subserver - 1] = InvalidOid;
    1877            0 :                 replaces[Anum_pg_subscription_subserver - 1] = true;
    1878              :             }
    1879              : 
    1880              :             /* Load the library providing us libpq calls. */
    1881           10 :             load_file("libpqwalreceiver", false);
    1882              :             /* Check the connection info string. */
    1883           10 :             walrcv_check_conninfo(stmt->conninfo,
    1884              :                                   sub->passwordrequired && !sub->ownersuperuser);
    1885              : 
    1886            7 :             values[Anum_pg_subscription_subconninfo - 1] =
    1887            7 :                 CStringGetTextDatum(stmt->conninfo);
    1888            7 :             replaces[Anum_pg_subscription_subconninfo - 1] = true;
    1889            7 :             update_tuple = true;
    1890              : 
    1891              :             /*
    1892              :              * Since the remote server configuration might have changed,
    1893              :              * perform a check to ensure it permits enabling
    1894              :              * retain_dead_tuples.
    1895              :              */
    1896            7 :             check_pub_rdt = sub->retaindeadtuples;
    1897            7 :             break;
    1898              : 
    1899           16 :         case ALTER_SUBSCRIPTION_SET_PUBLICATION:
    1900              :             {
    1901           16 :                 supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
    1902           16 :                 parse_subscription_options(pstate, stmt->options,
    1903              :                                            supported_opts, &opts);
    1904              : 
    1905           16 :                 values[Anum_pg_subscription_subpublications - 1] =
    1906           16 :                     publicationListToArray(stmt->publication);
    1907           16 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1908              : 
    1909           16 :                 update_tuple = true;
    1910              : 
    1911              :                 /* Refresh if user asked us to. */
    1912           16 :                 if (opts.refresh)
    1913              :                 {
    1914           13 :                     if (!sub->enabled)
    1915            0 :                         ereport(ERROR,
    1916              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1917              :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1918              :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
    1919              : 
    1920              :                     /*
    1921              :                      * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    1922              :                      * why this is not allowed.
    1923              :                      */
    1924           13 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1925            0 :                         ereport(ERROR,
    1926              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1927              :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1928              :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    1929              : 
    1930           13 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1931              : 
    1932              :                     /* Make sure refresh sees the new list of publications. */
    1933            7 :                     sub->publications = stmt->publication;
    1934              : 
    1935            7 :                     AlterSubscription_refresh(sub, opts.copy_data,
    1936              :                                               stmt->publication);
    1937              :                 }
    1938              : 
    1939           10 :                 break;
    1940              :             }
    1941              : 
    1942           27 :         case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
    1943              :         case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
    1944              :             {
    1945              :                 List       *publist;
    1946           27 :                 bool        isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
    1947              : 
    1948           27 :                 supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
    1949           27 :                 parse_subscription_options(pstate, stmt->options,
    1950              :                                            supported_opts, &opts);
    1951              : 
    1952           27 :                 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
    1953            9 :                 values[Anum_pg_subscription_subpublications - 1] =
    1954            9 :                     publicationListToArray(publist);
    1955            9 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1956              : 
    1957            9 :                 update_tuple = true;
    1958              : 
    1959              :                 /* Refresh if user asked us to. */
    1960            9 :                 if (opts.refresh)
    1961              :                 {
    1962              :                     /* We only need to validate user specified publications. */
    1963            3 :                     List       *validate_publications = (isadd) ? stmt->publication : NULL;
    1964              : 
    1965            3 :                     if (!sub->enabled)
    1966            0 :                         ereport(ERROR,
    1967              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1968              :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1969              :                         /* translator: %s is an SQL ALTER command */
    1970              :                                  errhint("Use %s instead.",
    1971              :                                          isadd ?
    1972              :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
    1973              :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
    1974              : 
    1975              :                     /*
    1976              :                      * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    1977              :                      * why this is not allowed.
    1978              :                      */
    1979            3 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1980            0 :                         ereport(ERROR,
    1981              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1982              :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1983              :                         /* translator: %s is an SQL ALTER command */
    1984              :                                  errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
    1985              :                                          isadd ?
    1986              :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
    1987              :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
    1988              : 
    1989            3 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1990              : 
    1991              :                     /* Refresh the new list of publications. */
    1992            3 :                     sub->publications = publist;
    1993              : 
    1994            3 :                     AlterSubscription_refresh(sub, opts.copy_data,
    1995              :                                               validate_publications);
    1996              :                 }
    1997              : 
    1998            9 :                 break;
    1999              :             }
    2000              : 
    2001           34 :         case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
    2002              :             {
    2003           34 :                 if (!sub->enabled)
    2004            3 :                     ereport(ERROR,
    2005              :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2006              :                              errmsg("%s is not allowed for disabled subscriptions",
    2007              :                                     "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
    2008              : 
    2009           31 :                 parse_subscription_options(pstate, stmt->options,
    2010              :                                            SUBOPT_COPY_DATA, &opts);
    2011              : 
    2012              :                 /*
    2013              :                  * The subscription option "two_phase" requires that
    2014              :                  * replication has passed the initial table synchronization
    2015              :                  * phase before the two_phase becomes properly enabled.
    2016              :                  *
    2017              :                  * But, having reached this two-phase commit "enabled" state
    2018              :                  * we must not allow any subsequent table initialization to
    2019              :                  * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
    2020              :                  * disallowed when the user had requested two_phase = on mode.
    2021              :                  *
    2022              :                  * The exception to this restriction is when copy_data =
    2023              :                  * false, because when copy_data is false the tablesync will
    2024              :                  * start already in READY state and will exit directly without
    2025              :                  * doing anything.
    2026              :                  *
    2027              :                  * For more details see comments atop worker.c.
    2028              :                  */
    2029           31 :                 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    2030            0 :                     ereport(ERROR,
    2031              :                             (errcode(ERRCODE_SYNTAX_ERROR),
    2032              :                              errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
    2033              :                              errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    2034              : 
    2035           31 :                 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
    2036              : 
    2037           28 :                 AlterSubscription_refresh(sub, opts.copy_data, NULL);
    2038              : 
    2039           27 :                 break;
    2040              :             }
    2041              : 
    2042            1 :         case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
    2043              :             {
    2044            1 :                 if (!sub->enabled)
    2045            0 :                     ereport(ERROR,
    2046              :                             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2047              :                             errmsg("%s is not allowed for disabled subscriptions",
    2048              :                                    "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
    2049              : 
    2050            1 :                 AlterSubscription_refresh_seq(sub);
    2051              : 
    2052            1 :                 break;
    2053              :             }
    2054              : 
    2055           12 :         case ALTER_SUBSCRIPTION_SKIP:
    2056              :             {
    2057           12 :                 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
    2058              : 
    2059              :                 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
    2060              :                 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
    2061              : 
    2062              :                 /*
    2063              :                  * If the user sets subskiplsn, we do a sanity check to make
    2064              :                  * sure that the specified LSN is a probable value.
    2065              :                  */
    2066            9 :                 if (XLogRecPtrIsValid(opts.lsn))
    2067              :                 {
    2068              :                     ReplOriginId originid;
    2069              :                     char        originname[NAMEDATALEN];
    2070              :                     XLogRecPtr  remote_lsn;
    2071              : 
    2072            6 :                     ReplicationOriginNameForLogicalRep(subid, InvalidOid,
    2073              :                                                        originname, sizeof(originname));
    2074            6 :                     originid = replorigin_by_name(originname, false);
    2075            6 :                     remote_lsn = replorigin_get_progress(originid, false);
    2076              : 
    2077              :                     /* Check the given LSN is at least a future LSN */
    2078            6 :                     if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
    2079            0 :                         ereport(ERROR,
    2080              :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2081              :                                  errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
    2082              :                                         LSN_FORMAT_ARGS(opts.lsn),
    2083              :                                         LSN_FORMAT_ARGS(remote_lsn))));
    2084              :                 }
    2085              : 
    2086            9 :                 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
    2087            9 :                 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
    2088              : 
    2089            9 :                 update_tuple = true;
    2090            9 :                 break;
    2091              :             }
    2092              : 
    2093            0 :         default:
    2094            0 :             elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
    2095              :                  stmt->kind);
    2096              :     }
    2097              : 
    2098              :     /* Update the catalog if needed. */
    2099          217 :     if (update_tuple)
    2100              :     {
    2101          189 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    2102              :                                 replaces);
    2103              : 
    2104          189 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    2105              : 
    2106          189 :         heap_freetuple(tup);
    2107              :     }
    2108              : 
    2109              :     /*
    2110              :      * Try to acquire the connection necessary either for modifying the slot
    2111              :      * or for checking if the remote server permits enabling
    2112              :      * retain_dead_tuples.
    2113              :      *
    2114              :      * This has to be at the end because otherwise if there is an error while
    2115              :      * doing the database operations we won't be able to rollback altered
    2116              :      * slot.
    2117              :      */
    2118          217 :     if (update_failover || update_two_phase || check_pub_rdt)
    2119              :     {
    2120              :         bool        must_use_password;
    2121              :         char       *err;
    2122              :         WalReceiverConn *wrconn;
    2123              : 
    2124              :         /* Load the library providing us libpq calls. */
    2125           13 :         load_file("libpqwalreceiver", false);
    2126              : 
    2127              :         /*
    2128              :          * Try to connect to the publisher, using the new connection string if
    2129              :          * available.
    2130              :          */
    2131           13 :         must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    2132           13 :         wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
    2133              :                                 true, true, must_use_password, sub->name,
    2134              :                                 &err);
    2135           13 :         if (!wrconn)
    2136            0 :             ereport(ERROR,
    2137              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    2138              :                      errmsg("subscription \"%s\" could not connect to the publisher: %s",
    2139              :                             sub->name, err)));
    2140              : 
    2141           13 :         PG_TRY();
    2142              :         {
    2143           13 :             if (retain_dead_tuples)
    2144            9 :                 check_pub_dead_tuple_retention(wrconn);
    2145              : 
    2146           13 :             check_publications_origin_tables(wrconn, sub->publications, false,
    2147              :                                              retain_dead_tuples, origin, NULL, 0,
    2148              :                                              sub->name);
    2149              : 
    2150           13 :             if (update_failover || update_two_phase)
    2151            5 :                 walrcv_alter_slot(wrconn, sub->slotname,
    2152              :                                   update_failover ? &opts.failover : NULL,
    2153              :                                   update_two_phase ? &opts.twophase : NULL);
    2154              :         }
    2155            0 :         PG_FINALLY();
    2156              :         {
    2157           13 :             walrcv_disconnect(wrconn);
    2158              :         }
    2159           13 :         PG_END_TRY();
    2160              :     }
    2161              : 
    2162          217 :     table_close(rel, RowExclusiveLock);
    2163              : 
    2164          217 :     InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
    2165              : 
    2166              :     /* Wake up related replication workers to handle this change quickly. */
    2167          217 :     LogicalRepWorkersWakeupAtCommit(subid);
    2168              : 
    2169          217 :     return myself;
    2170              : }
    2171              : 
    2172              : /*
    2173              :  * Drop a subscription
    2174              :  */
    2175              : void
    2176          136 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    2177              : {
    2178              :     Relation    rel;
    2179              :     ObjectAddress myself;
    2180              :     HeapTuple   tup;
    2181              :     Oid         subid;
    2182              :     Oid         subowner;
    2183              :     Datum       datum;
    2184              :     bool        isnull;
    2185              :     char       *subname;
    2186              :     char       *conninfo;
    2187              :     char       *slotname;
    2188              :     List       *subworkers;
    2189              :     ListCell   *lc;
    2190              :     char        originname[NAMEDATALEN];
    2191          136 :     char       *err = NULL;
    2192          136 :     WalReceiverConn *wrconn = NULL;
    2193              :     Form_pg_subscription form;
    2194              :     List       *rstates;
    2195              :     bool        must_use_password;
    2196              : 
    2197              :     /*
    2198              :      * The launcher may concurrently start a new worker for this subscription.
    2199              :      * During initialization, the worker checks for subscription validity and
    2200              :      * exits if the subscription has already been dropped. See
    2201              :      * InitializeLogRepWorker.
    2202              :      */
    2203          136 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2204              : 
    2205          136 :     tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2206          136 :                           CStringGetDatum(stmt->subname));
    2207              : 
    2208          136 :     if (!HeapTupleIsValid(tup))
    2209              :     {
    2210            6 :         table_close(rel, NoLock);
    2211              : 
    2212            6 :         if (!stmt->missing_ok)
    2213            3 :             ereport(ERROR,
    2214              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2215              :                      errmsg("subscription \"%s\" does not exist",
    2216              :                             stmt->subname)));
    2217              :         else
    2218            3 :             ereport(NOTICE,
    2219              :                     (errmsg("subscription \"%s\" does not exist, skipping",
    2220              :                             stmt->subname)));
    2221              : 
    2222           50 :         return;
    2223              :     }
    2224              : 
    2225          130 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2226          130 :     subid = form->oid;
    2227          130 :     subowner = form->subowner;
    2228          130 :     must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
    2229              : 
    2230              :     /* must be owner */
    2231          130 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    2232            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2233            0 :                        stmt->subname);
    2234              : 
    2235              :     /* DROP hook for the subscription being removed */
    2236          130 :     InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
    2237              : 
    2238              :     /*
    2239              :      * Lock the subscription so nobody else can do anything with it (including
    2240              :      * the replication workers).
    2241              :      */
    2242          130 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    2243              : 
    2244              :     /* Get subname */
    2245          130 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2246              :                                    Anum_pg_subscription_subname);
    2247          130 :     subname = pstrdup(NameStr(*DatumGetName(datum)));
    2248              : 
    2249              :     /* Get conninfo */
    2250          130 :     if (OidIsValid(form->subserver))
    2251              :     {
    2252              :         AclResult   aclresult;
    2253              : 
    2254            7 :         aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
    2255              :                                     form->subowner, ACL_USAGE);
    2256            7 :         if (aclresult != ACLCHECK_OK)
    2257              :         {
    2258              :             /*
    2259              :              * Unable to generate connection string because permissions on the
    2260              :              * foreign server have been removed. Follow the same logic as an
    2261              :              * unusable subconninfo (which will result in an ERROR later
    2262              :              * unless slot_name = NONE).
    2263              :              */
    2264            6 :             err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
    2265              :                            GetUserNameFromId(form->subowner, false),
    2266              :                            ForeignServerName(form->subserver));
    2267            6 :             conninfo = NULL;
    2268              :         }
    2269              :         else
    2270            1 :             conninfo = ForeignServerConnectionString(form->subowner,
    2271              :                                                      form->subserver);
    2272              :     }
    2273              :     else
    2274              :     {
    2275          123 :         datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2276              :                                        Anum_pg_subscription_subconninfo);
    2277          123 :         conninfo = TextDatumGetCString(datum);
    2278              :     }
    2279              : 
    2280              :     /* Get slotname */
    2281          130 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
    2282              :                             Anum_pg_subscription_subslotname, &isnull);
    2283          130 :     if (!isnull)
    2284           83 :         slotname = pstrdup(NameStr(*DatumGetName(datum)));
    2285              :     else
    2286           47 :         slotname = NULL;
    2287              : 
    2288              :     /*
    2289              :      * Since dropping a replication slot is not transactional, the replication
    2290              :      * slot stays dropped even if the transaction rolls back.  So we cannot
    2291              :      * run DROP SUBSCRIPTION inside a transaction block if dropping the
    2292              :      * replication slot.  Also, in this case, we report a message for dropping
    2293              :      * the subscription to the cumulative stats system.
    2294              :      *
    2295              :      * XXX The command name should really be something like "DROP SUBSCRIPTION
    2296              :      * of a subscription that is associated with a replication slot", but we
    2297              :      * don't have the proper facilities for that.
    2298              :      */
    2299          130 :     if (slotname)
    2300           83 :         PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
    2301              : 
    2302          127 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    2303          127 :     EventTriggerSQLDropAddObject(&myself, true, true);
    2304              : 
    2305              :     /* Remove the tuple from catalog. */
    2306          127 :     CatalogTupleDelete(rel, &tup->t_self);
    2307              : 
    2308          127 :     ReleaseSysCache(tup);
    2309              : 
    2310              :     /*
    2311              :      * Stop all the subscription workers immediately.
    2312              :      *
    2313              :      * This is necessary if we are dropping the replication slot, so that the
    2314              :      * slot becomes accessible.
    2315              :      *
    2316              :      * It is also necessary if the subscription is disabled and was disabled
    2317              :      * in the same transaction.  Then the workers haven't seen the disabling
    2318              :      * yet and will still be running, leading to hangs later when we want to
    2319              :      * drop the replication origin.  If the subscription was disabled before
    2320              :      * this transaction, then there shouldn't be any workers left, so this
    2321              :      * won't make a difference.
    2322              :      *
    2323              :      * New workers won't be started because we hold an exclusive lock on the
    2324              :      * subscription till the end of the transaction.
    2325              :      */
    2326          127 :     subworkers = logicalrep_workers_find(subid, false, true);
    2327          208 :     foreach(lc, subworkers)
    2328              :     {
    2329           81 :         LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
    2330              : 
    2331           81 :         logicalrep_worker_stop(w->type, w->subid, w->relid);
    2332              :     }
    2333          127 :     list_free(subworkers);
    2334              : 
    2335              :     /*
    2336              :      * Remove the no-longer-useful entry in the launcher's table of apply
    2337              :      * worker start times.
    2338              :      *
    2339              :      * If this transaction rolls back, the launcher might restart a failed
    2340              :      * apply worker before wal_retrieve_retry_interval milliseconds have
    2341              :      * elapsed, but that's pretty harmless.
    2342              :      */
    2343          127 :     ApplyLauncherForgetWorkerStartTime(subid);
    2344              : 
    2345              :     /*
    2346              :      * Cleanup of tablesync replication origins.
    2347              :      *
    2348              :      * Any READY-state relations would already have dealt with clean-ups.
    2349              :      *
    2350              :      * Note that the state can't change because we have already stopped both
    2351              :      * the apply and tablesync workers and they can't restart because of
    2352              :      * exclusive lock on the subscription.
    2353              :      */
    2354          127 :     rstates = GetSubscriptionRelations(subid, true, false, true);
    2355          132 :     foreach(lc, rstates)
    2356              :     {
    2357            5 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2358            5 :         Oid         relid = rstate->relid;
    2359              : 
    2360              :         /* Only cleanup resources of tablesync workers */
    2361            5 :         if (!OidIsValid(relid))
    2362            0 :             continue;
    2363              : 
    2364              :         /*
    2365              :          * Drop the tablesync's origin tracking if exists.
    2366              :          *
    2367              :          * It is possible that the origin is not yet created for tablesync
    2368              :          * worker so passing missing_ok = true. This can happen for the states
    2369              :          * before SUBREL_STATE_DATASYNC.
    2370              :          */
    2371            5 :         ReplicationOriginNameForLogicalRep(subid, relid, originname,
    2372              :                                            sizeof(originname));
    2373            5 :         replorigin_drop_by_name(originname, true, false);
    2374              :     }
    2375              : 
    2376              :     /* Clean up dependencies */
    2377          127 :     deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
    2378          127 :     deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
    2379              : 
    2380              :     /* Remove any associated relation synchronization states. */
    2381          127 :     RemoveSubscriptionRel(subid, InvalidOid);
    2382              : 
    2383              :     /* Remove the origin tracking if exists. */
    2384          127 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
    2385          127 :     replorigin_drop_by_name(originname, true, false);
    2386              : 
    2387              :     /*
    2388              :      * Tell the cumulative stats system that the subscription is getting
    2389              :      * dropped.
    2390              :      */
    2391          127 :     pgstat_drop_subscription(subid);
    2392              : 
    2393              :     /*
    2394              :      * If there is no slot associated with the subscription, we can finish
    2395              :      * here.
    2396              :      */
    2397          127 :     if (!slotname && rstates == NIL)
    2398              :     {
    2399           47 :         table_close(rel, NoLock);
    2400           47 :         return;
    2401              :     }
    2402              : 
    2403              :     /*
    2404              :      * Try to acquire the connection necessary for dropping slots.
    2405              :      *
    2406              :      * Note: If the slotname is NONE/NULL then we allow the command to finish
    2407              :      * and users need to manually cleanup the apply and tablesync worker slots
    2408              :      * later.
    2409              :      *
    2410              :      * This has to be at the end because otherwise if there is an error while
    2411              :      * doing the database operations we won't be able to rollback dropped
    2412              :      * slot.
    2413              :      */
    2414           80 :     load_file("libpqwalreceiver", false);
    2415              : 
    2416           80 :     if (conninfo)
    2417           77 :         wrconn = walrcv_connect(conninfo, true, true, must_use_password,
    2418              :                                 subname, &err);
    2419              : 
    2420           80 :     if (wrconn == NULL)
    2421              :     {
    2422            3 :         if (!slotname)
    2423              :         {
    2424              :             /* be tidy */
    2425            0 :             list_free(rstates);
    2426            0 :             table_close(rel, NoLock);
    2427            0 :             return;
    2428              :         }
    2429              :         else
    2430              :         {
    2431            3 :             ReportSlotConnectionError(rstates, subid, slotname, err);
    2432              :         }
    2433              :     }
    2434              : 
    2435           77 :     PG_TRY();
    2436              :     {
    2437           82 :         foreach(lc, rstates)
    2438              :         {
    2439            5 :             SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2440            5 :             Oid         relid = rstate->relid;
    2441              : 
    2442              :             /* Only cleanup resources of tablesync workers */
    2443            5 :             if (!OidIsValid(relid))
    2444            0 :                 continue;
    2445              : 
    2446              :             /*
    2447              :              * Drop the tablesync slots associated with removed tables.
    2448              :              *
    2449              :              * For SYNCDONE/READY states, the tablesync slot is known to have
    2450              :              * already been dropped by the tablesync worker.
    2451              :              *
    2452              :              * For other states, there is no certainty, maybe the slot does
    2453              :              * not exist yet. Also, if we fail after removing some of the
    2454              :              * slots, next time, it will again try to drop already dropped
    2455              :              * slots and fail. For these reasons, we allow missing_ok = true
    2456              :              * for the drop.
    2457              :              */
    2458            5 :             if (rstate->state != SUBREL_STATE_SYNCDONE)
    2459              :             {
    2460            5 :                 char        syncslotname[NAMEDATALEN] = {0};
    2461              : 
    2462            5 :                 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    2463              :                                                 sizeof(syncslotname));
    2464            5 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    2465              :             }
    2466              :         }
    2467              : 
    2468           77 :         list_free(rstates);
    2469              : 
    2470              :         /*
    2471              :          * If there is a slot associated with the subscription, then drop the
    2472              :          * replication slot at the publisher.
    2473              :          */
    2474           77 :         if (slotname)
    2475           77 :             ReplicationSlotDropAtPubNode(wrconn, slotname, false);
    2476              :     }
    2477            1 :     PG_FINALLY();
    2478              :     {
    2479           77 :         walrcv_disconnect(wrconn);
    2480              :     }
    2481           77 :     PG_END_TRY();
    2482              : 
    2483           76 :     table_close(rel, NoLock);
    2484              : }
    2485              : 
    2486              : /*
    2487              :  * Drop the replication slot at the publisher node using the replication
    2488              :  * connection.
    2489              :  *
    2490              :  * missing_ok - if true then only issue a LOG message if the slot doesn't
    2491              :  * exist.
    2492              :  */
    2493              : void
    2494          284 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
    2495              : {
    2496              :     StringInfoData cmd;
    2497              : 
    2498              :     Assert(wrconn);
    2499              : 
    2500          284 :     load_file("libpqwalreceiver", false);
    2501              : 
    2502          284 :     initStringInfo(&cmd);
    2503          284 :     appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
    2504              : 
    2505          284 :     PG_TRY();
    2506              :     {
    2507              :         WalRcvExecResult *res;
    2508              : 
    2509          284 :         res = walrcv_exec(wrconn, cmd.data, 0, NULL);
    2510              : 
    2511          284 :         if (res->status == WALRCV_OK_COMMAND)
    2512              :         {
    2513              :             /* NOTICE. Success. */
    2514          281 :             ereport(NOTICE,
    2515              :                     (errmsg("dropped replication slot \"%s\" on publisher",
    2516              :                             slotname)));
    2517              :         }
    2518            3 :         else if (res->status == WALRCV_ERROR &&
    2519            2 :                  missing_ok &&
    2520            2 :                  res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
    2521              :         {
    2522              :             /* LOG. Error, but missing_ok = true. */
    2523            2 :             ereport(LOG,
    2524              :                     (errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2525              :                             slotname, res->err)));
    2526              :         }
    2527              :         else
    2528              :         {
    2529              :             /* ERROR. */
    2530            1 :             ereport(ERROR,
    2531              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    2532              :                      errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2533              :                             slotname, res->err)));
    2534              :         }
    2535              : 
    2536          283 :         walrcv_clear_result(res);
    2537              :     }
    2538            1 :     PG_FINALLY();
    2539              :     {
    2540          284 :         pfree(cmd.data);
    2541              :     }
    2542          284 :     PG_END_TRY();
    2543          283 : }
    2544              : 
    2545              : /*
    2546              :  * Internal workhorse for changing a subscription owner
    2547              :  */
    2548              : static void
    2549            9 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2550              : {
    2551              :     Form_pg_subscription form;
    2552              :     AclResult   aclresult;
    2553              : 
    2554            9 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2555              : 
    2556            9 :     if (form->subowner == newOwnerId)
    2557            2 :         return;
    2558              : 
    2559            7 :     if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
    2560            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2561            0 :                        NameStr(form->subname));
    2562              : 
    2563              :     /*
    2564              :      * Don't allow non-superuser modification of a subscription with
    2565              :      * password_required=false.
    2566              :      */
    2567            7 :     if (!form->subpasswordrequired && !superuser())
    2568            0 :         ereport(ERROR,
    2569              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2570              :                  errmsg("password_required=false is superuser-only"),
    2571              :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    2572              : 
    2573              :     /* Must be able to become new owner */
    2574            7 :     check_can_set_role(GetUserId(), newOwnerId);
    2575              : 
    2576              :     /*
    2577              :      * current owner must have CREATE on database
    2578              :      *
    2579              :      * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
    2580              :      * other object types behave differently (e.g. you can't give a table to a
    2581              :      * user who lacks CREATE privileges on a schema).
    2582              :      */
    2583            4 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
    2584              :                                 GetUserId(), ACL_CREATE);
    2585            4 :     if (aclresult != ACLCHECK_OK)
    2586            0 :         aclcheck_error(aclresult, OBJECT_DATABASE,
    2587            0 :                        get_database_name(MyDatabaseId));
    2588              : 
    2589              :     /*
    2590              :      * If the subscription uses a server, check that the new owner has USAGE
    2591              :      * privileges on the server and that a user mapping exists. Note: does not
    2592              :      * re-check the resulting connection string.
    2593              :      */
    2594            4 :     if (OidIsValid(form->subserver))
    2595              :     {
    2596            0 :         Oid         serverid = form->subserver;
    2597              : 
    2598            0 :         aclresult = object_aclcheck(ForeignServerRelationId, serverid, newOwnerId, ACL_USAGE);
    2599            0 :         if (aclresult != ACLCHECK_OK)
    2600            0 :             ereport(ERROR,
    2601              :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2602              :                     errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
    2603              :                            GetUserNameFromId(newOwnerId, false),
    2604              :                            ForeignServerName(serverid)));
    2605              : 
    2606              :         /* make sure a user mapping exists */
    2607            0 :         GetUserMapping(newOwnerId, serverid);
    2608              :     }
    2609              : 
    2610            4 :     form->subowner = newOwnerId;
    2611            4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2612              : 
    2613              :     /* Update owner dependency reference */
    2614            4 :     changeDependencyOnOwner(SubscriptionRelationId,
    2615              :                             form->oid,
    2616              :                             newOwnerId);
    2617              : 
    2618            4 :     InvokeObjectPostAlterHook(SubscriptionRelationId,
    2619              :                               form->oid, 0);
    2620              : 
    2621              :     /* Wake up related background processes to handle this change quickly. */
    2622            4 :     ApplyLauncherWakeupAtCommit();
    2623            4 :     LogicalRepWorkersWakeupAtCommit(form->oid);
    2624              : }
    2625              : 
    2626              : /*
    2627              :  * Change subscription owner -- by name
    2628              :  */
    2629              : ObjectAddress
    2630            9 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
    2631              : {
    2632              :     Oid         subid;
    2633              :     HeapTuple   tup;
    2634              :     Relation    rel;
    2635              :     ObjectAddress address;
    2636              :     Form_pg_subscription form;
    2637              : 
    2638            9 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2639              : 
    2640            9 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2641              :                               CStringGetDatum(name));
    2642              : 
    2643            9 :     if (!HeapTupleIsValid(tup))
    2644            0 :         ereport(ERROR,
    2645              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2646              :                  errmsg("subscription \"%s\" does not exist", name)));
    2647              : 
    2648            9 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2649            9 :     subid = form->oid;
    2650              : 
    2651            9 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2652              : 
    2653            6 :     ObjectAddressSet(address, SubscriptionRelationId, subid);
    2654              : 
    2655            6 :     heap_freetuple(tup);
    2656              : 
    2657            6 :     table_close(rel, RowExclusiveLock);
    2658              : 
    2659            6 :     return address;
    2660              : }
    2661              : 
    2662              : /*
    2663              :  * Change subscription owner -- by OID
    2664              :  */
    2665              : void
    2666            0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
    2667              : {
    2668              :     HeapTuple   tup;
    2669              :     Relation    rel;
    2670              : 
    2671            0 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2672              : 
    2673            0 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
    2674              : 
    2675            0 :     if (!HeapTupleIsValid(tup))
    2676            0 :         ereport(ERROR,
    2677              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2678              :                  errmsg("subscription with OID %u does not exist", subid)));
    2679              : 
    2680            0 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2681              : 
    2682            0 :     heap_freetuple(tup);
    2683              : 
    2684            0 :     table_close(rel, RowExclusiveLock);
    2685            0 : }
    2686              : 
    2687              : /*
    2688              :  * Check and log a warning if the publisher has subscribed to the same table,
    2689              :  * its partition ancestors (if it's a partition), or its partition children (if
    2690              :  * it's a partitioned table), from some other publishers. This check is
    2691              :  * required in the following scenarios:
    2692              :  *
    2693              :  * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    2694              :  *    statements with "copy_data = true" and "origin = none":
    2695              :  *    - Warn the user that data with an origin might have been copied.
    2696              :  *    - This check is skipped for tables already added, as incremental sync via
    2697              :  *      WAL allows origin tracking. The list of such tables is in
    2698              :  *      subrel_local_oids.
    2699              :  *
    2700              :  * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    2701              :  *    statements with "retain_dead_tuples = true" and "origin = any", and for
    2702              :  *    ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
    2703              :  *    or when the publisher's status changes (e.g., due to a connection string
    2704              :  *    update):
    2705              :  *    - Warn the user that only conflict detection info for local changes on
    2706              :  *      the publisher is retained. Data from other origins may lack sufficient
    2707              :  *      details for reliable conflict detection.
    2708              :  *    - See comments atop worker.c for more details.
    2709              :  */
    2710              : static void
    2711          175 : check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
    2712              :                                  bool copydata, bool retain_dead_tuples,
    2713              :                                  char *origin, Oid *subrel_local_oids,
    2714              :                                  int subrel_count, char *subname)
    2715              : {
    2716              :     WalRcvExecResult *res;
    2717              :     StringInfoData cmd;
    2718              :     TupleTableSlot *slot;
    2719          175 :     Oid         tableRow[1] = {TEXTOID};
    2720          175 :     List       *publist = NIL;
    2721              :     int         i;
    2722              :     bool        check_rdt;
    2723              :     bool        check_table_sync;
    2724          350 :     bool        origin_none = origin &&
    2725          175 :         pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
    2726              : 
    2727              :     /*
    2728              :      * Enable retain_dead_tuples checks only when origin is set to 'any',
    2729              :      * since with origin='none' only local changes are replicated to the
    2730              :      * subscriber.
    2731              :      */
    2732          175 :     check_rdt = retain_dead_tuples && !origin_none;
    2733              : 
    2734              :     /*
    2735              :      * Enable table synchronization checks only when origin is 'none', to
    2736              :      * ensure that data from other origins is not inadvertently copied.
    2737              :      */
    2738          175 :     check_table_sync = copydata && origin_none;
    2739              : 
    2740              :     /* retain_dead_tuples and table sync checks occur separately */
    2741              :     Assert(!(check_rdt && check_table_sync));
    2742              : 
    2743              :     /* Return if no checks are required */
    2744          175 :     if (!check_rdt && !check_table_sync)
    2745          161 :         return;
    2746              : 
    2747           14 :     initStringInfo(&cmd);
    2748           14 :     appendStringInfoString(&cmd,
    2749              :                            "SELECT DISTINCT P.pubname AS pubname\n"
    2750              :                            "FROM pg_publication P,\n"
    2751              :                            "     LATERAL pg_get_publication_tables(P.pubname) GPT\n"
    2752              :                            "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
    2753              :                            "     GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
    2754              :                            "                   SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
    2755              :                            "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    2756              :                            "WHERE C.oid = GPT.relid AND P.pubname IN (");
    2757           14 :     GetPublicationsStr(publications, &cmd, true);
    2758           14 :     appendStringInfoString(&cmd, ")\n");
    2759              : 
    2760              :     /*
    2761              :      * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    2762              :      * subrel_local_oids contains the list of relation oids that are already
    2763              :      * present on the subscriber. This check should be skipped for these
    2764              :      * tables if checking for table sync scenario. However, when handling the
    2765              :      * retain_dead_tuples scenario, ensure all tables are checked, as some
    2766              :      * existing tables may now include changes from other origins due to newly
    2767              :      * created subscriptions on the publisher.
    2768              :      */
    2769           14 :     if (check_table_sync)
    2770              :     {
    2771           14 :         for (i = 0; i < subrel_count; i++)
    2772              :         {
    2773            4 :             Oid         relid = subrel_local_oids[i];
    2774            4 :             char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    2775            4 :             char       *tablename = get_rel_name(relid);
    2776              : 
    2777            4 :             appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
    2778              :                              schemaname, tablename);
    2779              :         }
    2780              :     }
    2781              : 
    2782           14 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    2783           14 :     pfree(cmd.data);
    2784              : 
    2785           14 :     if (res->status != WALRCV_OK_TUPLES)
    2786            0 :         ereport(ERROR,
    2787              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2788              :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    2789              :                         res->err)));
    2790              : 
    2791              :     /* Process publications. */
    2792           14 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2793           19 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2794              :     {
    2795              :         char       *pubname;
    2796              :         bool        isnull;
    2797              : 
    2798            5 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2799              :         Assert(!isnull);
    2800              : 
    2801            5 :         ExecClearTuple(slot);
    2802            5 :         publist = list_append_unique(publist, makeString(pubname));
    2803              :     }
    2804              : 
    2805              :     /*
    2806              :      * Log a warning if the publisher has subscribed to the same table from
    2807              :      * some other publisher. We cannot know the origin of data during the
    2808              :      * initial sync. Data origins can be found only from the WAL by looking at
    2809              :      * the origin id.
    2810              :      *
    2811              :      * XXX: For simplicity, we don't check whether the table has any data or
    2812              :      * not. If the table doesn't have any data then we don't need to
    2813              :      * distinguish between data having origin and data not having origin so we
    2814              :      * can avoid logging a warning for table sync scenario.
    2815              :      */
    2816           14 :     if (publist)
    2817              :     {
    2818              :         StringInfoData pubnames;
    2819              : 
    2820              :         /* Prepare the list of publication(s) for warning message. */
    2821            5 :         initStringInfo(&pubnames);
    2822            5 :         GetPublicationsStr(publist, &pubnames, false);
    2823              : 
    2824            5 :         if (check_table_sync)
    2825            4 :             ereport(WARNING,
    2826              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2827              :                     errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    2828              :                            subname),
    2829              :                     errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    2830              :                                      "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    2831              :                                      list_length(publist), pubnames.data),
    2832              :                     errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
    2833              :         else
    2834            1 :             ereport(WARNING,
    2835              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2836              :                     errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
    2837              :                            subname),
    2838              :                     errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    2839              :                                      "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    2840              :                                      list_length(publist), pubnames.data),
    2841              :                     errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
    2842              :     }
    2843              : 
    2844           14 :     ExecDropSingleTupleTableSlot(slot);
    2845              : 
    2846           14 :     walrcv_clear_result(res);
    2847              : }
    2848              : 
    2849              : /*
    2850              :  * This function is similar to check_publications_origin_tables and serves
    2851              :  * same purpose for sequences.
    2852              :  */
    2853              : static void
    2854          163 : check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
    2855              :                                     bool copydata, char *origin,
    2856              :                                     Oid *subrel_local_oids, int subrel_count,
    2857              :                                     char *subname)
    2858              : {
    2859              :     WalRcvExecResult *res;
    2860              :     StringInfoData cmd;
    2861              :     TupleTableSlot *slot;
    2862          163 :     Oid         tableRow[1] = {TEXTOID};
    2863          163 :     List       *publist = NIL;
    2864              : 
    2865              :     /*
    2866              :      * Enable sequence synchronization checks only when origin is 'none' , to
    2867              :      * ensure that sequence data from other origins is not inadvertently
    2868              :      * copied. This check is necessary if the publisher is running PG19 or
    2869              :      * later, where logical replication sequence synchronization is supported.
    2870              :      */
    2871          173 :     if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
    2872           10 :         walrcv_server_version(wrconn) < 190000)
    2873          153 :         return;
    2874              : 
    2875           10 :     initStringInfo(&cmd);
    2876           10 :     appendStringInfoString(&cmd,
    2877              :                            "SELECT DISTINCT P.pubname AS pubname\n"
    2878              :                            "FROM pg_publication P,\n"
    2879              :                            "     LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
    2880              :                            "     JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
    2881              :                            "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    2882              :                            "WHERE C.oid = GPS.relid AND P.pubname IN (");
    2883              : 
    2884           10 :     GetPublicationsStr(publications, &cmd, true);
    2885           10 :     appendStringInfoString(&cmd, ")\n");
    2886              : 
    2887              :     /*
    2888              :      * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    2889              :      * subrel_local_oids contains the list of relations that are already
    2890              :      * present on the subscriber. This check should be skipped as these will
    2891              :      * not be re-synced.
    2892              :      */
    2893           10 :     for (int i = 0; i < subrel_count; i++)
    2894              :     {
    2895            0 :         Oid         relid = subrel_local_oids[i];
    2896            0 :         char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    2897            0 :         char       *seqname = get_rel_name(relid);
    2898              : 
    2899            0 :         appendStringInfo(&cmd,
    2900              :                          "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
    2901              :                          schemaname, seqname);
    2902              :     }
    2903              : 
    2904           10 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    2905           10 :     pfree(cmd.data);
    2906              : 
    2907           10 :     if (res->status != WALRCV_OK_TUPLES)
    2908            0 :         ereport(ERROR,
    2909              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2910              :                  errmsg("could not receive list of replicated sequences from the publisher: %s",
    2911              :                         res->err)));
    2912              : 
    2913              :     /* Process publications. */
    2914           10 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2915           10 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2916              :     {
    2917              :         char       *pubname;
    2918              :         bool        isnull;
    2919              : 
    2920            0 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2921              :         Assert(!isnull);
    2922              : 
    2923            0 :         ExecClearTuple(slot);
    2924            0 :         publist = list_append_unique(publist, makeString(pubname));
    2925              :     }
    2926              : 
    2927              :     /*
    2928              :      * Log a warning if the publisher has subscribed to the same sequence from
    2929              :      * some other publisher. We cannot know the origin of sequences data
    2930              :      * during the initial sync.
    2931              :      */
    2932           10 :     if (publist)
    2933              :     {
    2934              :         StringInfoData pubnames;
    2935              : 
    2936              :         /* Prepare the list of publication(s) for warning message. */
    2937            0 :         initStringInfo(&pubnames);
    2938            0 :         GetPublicationsStr(publist, &pubnames, false);
    2939              : 
    2940            0 :         ereport(WARNING,
    2941              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2942              :                 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    2943              :                        subname),
    2944              :                 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
    2945              :                                  "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
    2946              :                                  list_length(publist), pubnames.data),
    2947              :                 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
    2948              :     }
    2949              : 
    2950           10 :     ExecDropSingleTupleTableSlot(slot);
    2951              : 
    2952           10 :     walrcv_clear_result(res);
    2953              : }
    2954              : 
    2955              : /*
    2956              :  * Determine whether the retain_dead_tuples can be enabled based on the
    2957              :  * publisher's status.
    2958              :  *
    2959              :  * This option is disallowed if the publisher is running a version earlier
    2960              :  * than the PG19, or if the publisher is in recovery (i.e., it is a standby
    2961              :  * server).
    2962              :  *
    2963              :  * See comments atop worker.c for a detailed explanation.
    2964              :  */
    2965              : static void
    2966           12 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
    2967              : {
    2968              :     WalRcvExecResult *res;
    2969           12 :     Oid         RecoveryRow[1] = {BOOLOID};
    2970              :     TupleTableSlot *slot;
    2971              :     bool        isnull;
    2972              :     bool        remote_in_recovery;
    2973              : 
    2974           12 :     if (walrcv_server_version(wrconn) < 190000)
    2975            0 :         ereport(ERROR,
    2976              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2977              :                 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
    2978              : 
    2979           12 :     res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
    2980              : 
    2981           12 :     if (res->status != WALRCV_OK_TUPLES)
    2982            0 :         ereport(ERROR,
    2983              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2984              :                  errmsg("could not obtain recovery progress from the publisher: %s",
    2985              :                         res->err)));
    2986              : 
    2987           12 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2988           12 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2989            0 :         elog(ERROR, "failed to fetch tuple for the recovery progress");
    2990              : 
    2991           12 :     remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
    2992              : 
    2993           12 :     if (remote_in_recovery)
    2994            0 :         ereport(ERROR,
    2995              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2996              :                 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery"));
    2997              : 
    2998           12 :     ExecDropSingleTupleTableSlot(slot);
    2999              : 
    3000           12 :     walrcv_clear_result(res);
    3001           12 : }
    3002              : 
    3003              : /*
    3004              :  * Check if the subscriber's configuration is adequate to enable the
    3005              :  * retain_dead_tuples option.
    3006              :  *
    3007              :  * Issue an ERROR if the wal_level does not support the use of replication
    3008              :  * slots when check_guc is set to true.
    3009              :  *
    3010              :  * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
    3011              :  * set to true. This is only to highlight the importance of enabling
    3012              :  * track_commit_timestamp instead of catching all the misconfigurations, as
    3013              :  * this setting can be adjusted after subscription creation. Without it, the
    3014              :  * apply worker will simply skip conflict detection.
    3015              :  *
    3016              :  * Issue a WARNING or NOTICE if the subscription is disabled and the retention
    3017              :  * is active. Do not raise an ERROR since users can only modify
    3018              :  * retain_dead_tuples for disabled subscriptions. And as long as the
    3019              :  * subscription is enabled promptly, it will not pose issues.
    3020              :  *
    3021              :  * Issue a NOTICE to inform users that max_retention_duration is
    3022              :  * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
    3023              :  * is not issued because setting max_retention_duration causes no harm,
    3024              :  * even when it is ineffective.
    3025              :  */
    3026              : void
    3027          260 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
    3028              :                            int elevel_for_sub_disabled,
    3029              :                            bool retain_dead_tuples, bool retention_active,
    3030              :                            bool max_retention_set)
    3031              : {
    3032              :     Assert(elevel_for_sub_disabled == NOTICE ||
    3033              :            elevel_for_sub_disabled == WARNING);
    3034              : 
    3035          260 :     if (retain_dead_tuples)
    3036              :     {
    3037           17 :         if (check_guc && wal_level < WAL_LEVEL_REPLICA)
    3038            0 :             ereport(ERROR,
    3039              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3040              :                     errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
    3041              :                     errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
    3042              : 
    3043           17 :         if (check_guc && !track_commit_timestamp)
    3044            4 :             ereport(WARNING,
    3045              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3046              :                     errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
    3047              :                     errhint("Consider setting \"%s\" to true.",
    3048              :                             "track_commit_timestamp"));
    3049              : 
    3050           17 :         if (sub_disabled && retention_active)
    3051            7 :             ereport(elevel_for_sub_disabled,
    3052              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3053              :                     errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
    3054              :                     (elevel_for_sub_disabled > NOTICE)
    3055              :                     ? errhint("Consider setting %s to false.",
    3056              :                               "retain_dead_tuples") : 0);
    3057              :     }
    3058          243 :     else if (max_retention_set)
    3059              :     {
    3060            3 :         ereport(NOTICE,
    3061              :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3062              :                 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
    3063              :     }
    3064          260 : }
    3065              : 
    3066              : /*
    3067              :  * Return true iff 'rv' is a member of the list.
    3068              :  */
    3069              : static bool
    3070          281 : list_member_rangevar(const List *list, RangeVar *rv)
    3071              : {
    3072         1022 :     foreach_ptr(PublicationRelKind, relinfo, list)
    3073              :     {
    3074          462 :         if (equal(relinfo->rv, rv))
    3075            1 :             return true;
    3076              :     }
    3077              : 
    3078          280 :     return false;
    3079              : }
    3080              : 
    3081              : /*
    3082              :  * Get the list of tables and sequences which belong to specified publications
    3083              :  * on the publisher connection.
    3084              :  *
    3085              :  * Note that we don't support the case where the column list is different for
    3086              :  * the same table in different publications to avoid sending unwanted column
    3087              :  * information for some of the rows. This can happen when both the column
    3088              :  * list and row filter are specified for different publications.
    3089              :  */
    3090              : static List *
    3091          162 : fetch_relation_list(WalReceiverConn *wrconn, List *publications)
    3092              : {
    3093              :     WalRcvExecResult *res;
    3094              :     StringInfoData cmd;
    3095              :     TupleTableSlot *slot;
    3096          162 :     Oid         tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
    3097          162 :     List       *relationlist = NIL;
    3098          162 :     int         server_version = walrcv_server_version(wrconn);
    3099          162 :     bool        check_columnlist = (server_version >= 150000);
    3100          162 :     int         column_count = check_columnlist ? 4 : 3;
    3101              :     StringInfoData pub_names;
    3102              : 
    3103          162 :     initStringInfo(&cmd);
    3104          162 :     initStringInfo(&pub_names);
    3105              : 
    3106              :     /* Build the pub_names comma-separated string. */
    3107          162 :     GetPublicationsStr(publications, &pub_names, true);
    3108              : 
    3109              :     /* Get the list of relations from the publisher */
    3110          162 :     if (server_version >= 160000)
    3111              :     {
    3112          162 :         tableRow[3] = INT2VECTOROID;
    3113              : 
    3114              :         /*
    3115              :          * From version 16, we allowed passing multiple publications to the
    3116              :          * function pg_get_publication_tables. This helped to filter out the
    3117              :          * partition table whose ancestor is also published in this
    3118              :          * publication array.
    3119              :          *
    3120              :          * Join pg_get_publication_tables with pg_publication to exclude
    3121              :          * non-existing publications.
    3122              :          *
    3123              :          * Note that attrs are always stored in sorted order so we don't need
    3124              :          * to worry if different publications have specified them in a
    3125              :          * different order. See pub_collist_validate.
    3126              :          */
    3127          162 :         appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
    3128              :                          "   FROM pg_class c\n"
    3129              :                          "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
    3130              :                          "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
    3131              :                          "                FROM pg_publication\n"
    3132              :                          "                WHERE pubname IN ( %s )) AS gpt\n"
    3133              :                          "             ON gpt.relid = c.oid\n",
    3134              :                          pub_names.data);
    3135              : 
    3136              :         /* From version 19, inclusion of sequences in the target is supported */
    3137          162 :         if (server_version >= 190000)
    3138          162 :             appendStringInfo(&cmd,
    3139              :                              "UNION ALL\n"
    3140              :                              "  SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
    3141              :                              "  FROM pg_catalog.pg_publication_sequences s\n"
    3142              :                              "  WHERE s.pubname IN ( %s )",
    3143              :                              pub_names.data);
    3144              :     }
    3145              :     else
    3146              :     {
    3147            0 :         tableRow[3] = NAMEARRAYOID;
    3148            0 :         appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
    3149              : 
    3150              :         /* Get column lists for each relation if the publisher supports it */
    3151            0 :         if (check_columnlist)
    3152            0 :             appendStringInfoString(&cmd, ", t.attnames\n");
    3153              : 
    3154            0 :         appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
    3155              :                          " WHERE t.pubname IN ( %s )",
    3156              :                          pub_names.data);
    3157              :     }
    3158              : 
    3159          162 :     pfree(pub_names.data);
    3160              : 
    3161          162 :     res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
    3162          162 :     pfree(cmd.data);
    3163              : 
    3164          162 :     if (res->status != WALRCV_OK_TUPLES)
    3165            0 :         ereport(ERROR,
    3166              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    3167              :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    3168              :                         res->err)));
    3169              : 
    3170              :     /* Process tables. */
    3171          162 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    3172          459 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    3173              :     {
    3174              :         char       *nspname;
    3175              :         char       *relname;
    3176              :         bool        isnull;
    3177              :         char        relkind;
    3178          298 :         PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
    3179              : 
    3180          298 :         nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    3181              :         Assert(!isnull);
    3182          298 :         relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
    3183              :         Assert(!isnull);
    3184          298 :         relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
    3185              :         Assert(!isnull);
    3186              : 
    3187          298 :         relinfo->rv = makeRangeVar(nspname, relname, -1);
    3188          298 :         relinfo->relkind = relkind;
    3189              : 
    3190          298 :         if (relkind != RELKIND_SEQUENCE &&
    3191          281 :             check_columnlist &&
    3192          281 :             list_member_rangevar(relationlist, relinfo->rv))
    3193            1 :             ereport(ERROR,
    3194              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3195              :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    3196              :                            nspname, relname));
    3197              :         else
    3198          297 :             relationlist = lappend(relationlist, relinfo);
    3199              : 
    3200          297 :         ExecClearTuple(slot);
    3201              :     }
    3202          161 :     ExecDropSingleTupleTableSlot(slot);
    3203              : 
    3204          161 :     walrcv_clear_result(res);
    3205              : 
    3206          161 :     return relationlist;
    3207              : }
    3208              : 
    3209              : /*
    3210              :  * This is to report the connection failure while dropping replication slots.
    3211              :  * Here, we report the WARNING for all tablesync slots so that user can drop
    3212              :  * them manually, if required.
    3213              :  */
    3214              : static void
    3215            3 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
    3216              : {
    3217              :     ListCell   *lc;
    3218              : 
    3219            3 :     foreach(lc, rstates)
    3220              :     {
    3221            0 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    3222            0 :         Oid         relid = rstate->relid;
    3223              : 
    3224              :         /* Only cleanup resources of tablesync workers */
    3225            0 :         if (!OidIsValid(relid))
    3226            0 :             continue;
    3227              : 
    3228              :         /*
    3229              :          * Caller needs to ensure that relstate doesn't change underneath us.
    3230              :          * See DropSubscription where we get the relstates.
    3231              :          */
    3232            0 :         if (rstate->state != SUBREL_STATE_SYNCDONE)
    3233              :         {
    3234            0 :             char        syncslotname[NAMEDATALEN] = {0};
    3235              : 
    3236            0 :             ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    3237              :                                             sizeof(syncslotname));
    3238            0 :             elog(WARNING, "could not drop tablesync replication slot \"%s\"",
    3239              :                  syncslotname);
    3240              :         }
    3241              :     }
    3242              : 
    3243            3 :     ereport(ERROR,
    3244              :             (errcode(ERRCODE_CONNECTION_FAILURE),
    3245              :              errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
    3246              :                     slotname, err),
    3247              :     /* translator: %s is an SQL ALTER command */
    3248              :              errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
    3249              :                      "ALTER SUBSCRIPTION ... DISABLE",
    3250              :                      "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
    3251              : }
    3252              : 
    3253              : /*
    3254              :  * Check for duplicates in the given list of publications and error out if
    3255              :  * found one.  Add publications to datums as text datums, if datums is not
    3256              :  * NULL.
    3257              :  */
    3258              : static void
    3259          234 : check_duplicates_in_publist(List *publist, Datum *datums)
    3260              : {
    3261              :     ListCell   *cell;
    3262          234 :     int         j = 0;
    3263              : 
    3264          533 :     foreach(cell, publist)
    3265              :     {
    3266          308 :         char       *name = strVal(lfirst(cell));
    3267              :         ListCell   *pcell;
    3268              : 
    3269          460 :         foreach(pcell, publist)
    3270              :         {
    3271          460 :             char       *pname = strVal(lfirst(pcell));
    3272              : 
    3273          460 :             if (pcell == cell)
    3274          299 :                 break;
    3275              : 
    3276          161 :             if (strcmp(name, pname) == 0)
    3277            9 :                 ereport(ERROR,
    3278              :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    3279              :                          errmsg("publication name \"%s\" used more than once",
    3280              :                                 pname)));
    3281              :         }
    3282              : 
    3283          299 :         if (datums)
    3284          256 :             datums[j++] = CStringGetTextDatum(name);
    3285              :     }
    3286          225 : }
    3287              : 
    3288              : /*
    3289              :  * Merge current subscription's publications and user-specified publications
    3290              :  * from ADD/DROP PUBLICATIONS.
    3291              :  *
    3292              :  * If addpub is true, we will add the list of publications into oldpublist.
    3293              :  * Otherwise, we will delete the list of publications from oldpublist.  The
    3294              :  * returned list is a copy, oldpublist itself is not changed.
    3295              :  *
    3296              :  * subname is the subscription name, for error messages.
    3297              :  */
    3298              : static List *
    3299           27 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
    3300              : {
    3301              :     ListCell   *lc;
    3302              : 
    3303           27 :     oldpublist = list_copy(oldpublist);
    3304              : 
    3305           27 :     check_duplicates_in_publist(newpublist, NULL);
    3306              : 
    3307           46 :     foreach(lc, newpublist)
    3308              :     {
    3309           34 :         char       *name = strVal(lfirst(lc));
    3310              :         ListCell   *lc2;
    3311           34 :         bool        found = false;
    3312              : 
    3313           67 :         foreach(lc2, oldpublist)
    3314              :         {
    3315           55 :             char       *pubname = strVal(lfirst(lc2));
    3316              : 
    3317           55 :             if (strcmp(name, pubname) == 0)
    3318              :             {
    3319           22 :                 found = true;
    3320           22 :                 if (addpub)
    3321            6 :                     ereport(ERROR,
    3322              :                             (errcode(ERRCODE_DUPLICATE_OBJECT),
    3323              :                              errmsg("publication \"%s\" is already in subscription \"%s\"",
    3324              :                                     name, subname)));
    3325              :                 else
    3326           16 :                     oldpublist = foreach_delete_current(oldpublist, lc2);
    3327              : 
    3328           16 :                 break;
    3329              :             }
    3330              :         }
    3331              : 
    3332           28 :         if (addpub && !found)
    3333            9 :             oldpublist = lappend(oldpublist, makeString(name));
    3334           19 :         else if (!addpub && !found)
    3335            3 :             ereport(ERROR,
    3336              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3337              :                      errmsg("publication \"%s\" is not in subscription \"%s\"",
    3338              :                             name, subname)));
    3339              :     }
    3340              : 
    3341              :     /*
    3342              :      * XXX Probably no strong reason for this, but for now it's to make ALTER
    3343              :      * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
    3344              :      */
    3345           12 :     if (!oldpublist)
    3346            3 :         ereport(ERROR,
    3347              :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3348              :                  errmsg("cannot drop all the publications from a subscription")));
    3349              : 
    3350            9 :     return oldpublist;
    3351              : }
    3352              : 
    3353              : /*
    3354              :  * Extract the streaming mode value from a DefElem.  This is like
    3355              :  * defGetBoolean() but also accepts the special value of "parallel".
    3356              :  */
    3357              : char
    3358          454 : defGetStreamingMode(DefElem *def)
    3359              : {
    3360              :     /*
    3361              :      * If no parameter value given, assume "true" is meant.
    3362              :      */
    3363          454 :     if (!def->arg)
    3364            0 :         return LOGICALREP_STREAM_ON;
    3365              : 
    3366              :     /*
    3367              :      * Allow 0, 1, "false", "true", "off", "on" or "parallel".
    3368              :      */
    3369          454 :     switch (nodeTag(def->arg))
    3370              :     {
    3371            0 :         case T_Integer:
    3372            0 :             switch (intVal(def->arg))
    3373              :             {
    3374            0 :                 case 0:
    3375            0 :                     return LOGICALREP_STREAM_OFF;
    3376            0 :                 case 1:
    3377            0 :                     return LOGICALREP_STREAM_ON;
    3378            0 :                 default:
    3379              :                     /* otherwise, error out below */
    3380            0 :                     break;
    3381              :             }
    3382            0 :             break;
    3383          454 :         default:
    3384              :             {
    3385          454 :                 char       *sval = defGetString(def);
    3386              : 
    3387              :                 /*
    3388              :                  * The set of strings accepted here should match up with the
    3389              :                  * grammar's opt_boolean_or_string production.
    3390              :                  */
    3391          905 :                 if (pg_strcasecmp(sval, "false") == 0 ||
    3392          451 :                     pg_strcasecmp(sval, "off") == 0)
    3393            6 :                     return LOGICALREP_STREAM_OFF;
    3394          887 :                 if (pg_strcasecmp(sval, "true") == 0 ||
    3395          439 :                     pg_strcasecmp(sval, "on") == 0)
    3396           45 :                     return LOGICALREP_STREAM_ON;
    3397          403 :                 if (pg_strcasecmp(sval, "parallel") == 0)
    3398          400 :                     return LOGICALREP_STREAM_PARALLEL;
    3399              :             }
    3400            3 :             break;
    3401              :     }
    3402              : 
    3403            3 :     ereport(ERROR,
    3404              :             (errcode(ERRCODE_SYNTAX_ERROR),
    3405              :              errmsg("%s requires a Boolean value or \"parallel\"",
    3406              :                     def->defname)));
    3407              :     return LOGICALREP_STREAM_OFF;   /* keep compiler quiet */
    3408              : }
        

Generated by: LCOV version 2.0-1