LCOV - code coverage report
Current view: top level - src/backend/commands - subscriptioncmds.c (source / functions) Coverage Total Hit
Test: PostgreSQL 20devel Lines: 89.1 % 1210 1078
Test Date: 2026-07-03 19:57:34 Functions: 96.3 % 27 26
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 73.0 % 1016 742

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * subscriptioncmds.c
       4                 :             :  *      subscription catalog manipulation functions
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  * IDENTIFICATION
      10                 :             :  *      src/backend/commands/subscriptioncmds.c
      11                 :             :  *
      12                 :             :  *-------------------------------------------------------------------------
      13                 :             :  */
      14                 :             : 
      15                 :             : #include "postgres.h"
      16                 :             : 
      17                 :             : #include "access/commit_ts.h"
      18                 :             : #include "access/htup_details.h"
      19                 :             : #include "access/table.h"
      20                 :             : #include "access/twophase.h"
      21                 :             : #include "access/xact.h"
      22                 :             : #include "catalog/catalog.h"
      23                 :             : #include "catalog/dependency.h"
      24                 :             : #include "catalog/indexing.h"
      25                 :             : #include "catalog/namespace.h"
      26                 :             : #include "catalog/objectaccess.h"
      27                 :             : #include "catalog/objectaddress.h"
      28                 :             : #include "catalog/pg_authid_d.h"
      29                 :             : #include "catalog/pg_database_d.h"
      30                 :             : #include "catalog/pg_foreign_server.h"
      31                 :             : #include "catalog/pg_namespace.h"
      32                 :             : #include "catalog/pg_subscription.h"
      33                 :             : #include "catalog/pg_subscription_rel.h"
      34                 :             : #include "catalog/pg_type.h"
      35                 :             : #include "catalog/pg_user_mapping.h"
      36                 :             : #include "commands/defrem.h"
      37                 :             : #include "commands/event_trigger.h"
      38                 :             : #include "commands/subscriptioncmds.h"
      39                 :             : #include "commands/tablecmds.h"
      40                 :             : #include "executor/executor.h"
      41                 :             : #include "foreign/foreign.h"
      42                 :             : #include "miscadmin.h"
      43                 :             : #include "nodes/makefuncs.h"
      44                 :             : #include "pgstat.h"
      45                 :             : #include "replication/logicallauncher.h"
      46                 :             : #include "replication/logicalworker.h"
      47                 :             : #include "replication/origin.h"
      48                 :             : #include "replication/slot.h"
      49                 :             : #include "replication/walreceiver.h"
      50                 :             : #include "replication/walsender.h"
      51                 :             : #include "replication/worker_internal.h"
      52                 :             : #include "storage/lmgr.h"
      53                 :             : #include "utils/acl.h"
      54                 :             : #include "utils/builtins.h"
      55                 :             : #include "utils/guc.h"
      56                 :             : #include "utils/lsyscache.h"
      57                 :             : #include "utils/memutils.h"
      58                 :             : #include "utils/pg_lsn.h"
      59                 :             : #include "utils/syscache.h"
      60                 :             : 
      61                 :             : /*
      62                 :             :  * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
      63                 :             :  * command.
      64                 :             :  */
      65                 :             : #define SUBOPT_CONNECT              0x00000001
      66                 :             : #define SUBOPT_ENABLED              0x00000002
      67                 :             : #define SUBOPT_CREATE_SLOT          0x00000004
      68                 :             : #define SUBOPT_SLOT_NAME            0x00000008
      69                 :             : #define SUBOPT_COPY_DATA            0x00000010
      70                 :             : #define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020
      71                 :             : #define SUBOPT_REFRESH              0x00000040
      72                 :             : #define SUBOPT_BINARY               0x00000080
      73                 :             : #define SUBOPT_STREAMING            0x00000100
      74                 :             : #define SUBOPT_TWOPHASE_COMMIT      0x00000200
      75                 :             : #define SUBOPT_DISABLE_ON_ERR       0x00000400
      76                 :             : #define SUBOPT_PASSWORD_REQUIRED    0x00000800
      77                 :             : #define SUBOPT_RUN_AS_OWNER         0x00001000
      78                 :             : #define SUBOPT_FAILOVER             0x00002000
      79                 :             : #define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000
      80                 :             : #define SUBOPT_MAX_RETENTION_DURATION   0x00008000
      81                 :             : #define SUBOPT_WAL_RECEIVER_TIMEOUT         0x00010000
      82                 :             : #define SUBOPT_LSN                  0x00020000
      83                 :             : #define SUBOPT_ORIGIN               0x00040000
      84                 :             : #define SUBOPT_CONFLICT_LOG_DEST    0x00080000
      85                 :             : 
      86                 :             : /* check if the 'val' has 'bits' set */
      87                 :             : #define IsSet(val, bits)  (((val) & (bits)) == (bits))
      88                 :             : 
      89                 :             : /*
      90                 :             :  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
      91                 :             :  * SUBSCRIPTION command options and the parsed/default values of each of them.
      92                 :             :  */
      93                 :             : typedef struct SubOpts
      94                 :             : {
      95                 :             :     uint32      specified_opts;
      96                 :             :     char       *slot_name;
      97                 :             :     char       *synchronous_commit;
      98                 :             :     bool        connect;
      99                 :             :     bool        enabled;
     100                 :             :     bool        create_slot;
     101                 :             :     bool        copy_data;
     102                 :             :     bool        refresh;
     103                 :             :     bool        binary;
     104                 :             :     char        streaming;
     105                 :             :     bool        twophase;
     106                 :             :     bool        disableonerr;
     107                 :             :     bool        passwordrequired;
     108                 :             :     bool        runasowner;
     109                 :             :     bool        failover;
     110                 :             :     bool        retaindeadtuples;
     111                 :             :     int32       maxretention;
     112                 :             :     char       *origin;
     113                 :             :     ConflictLogDest conflictlogdest;
     114                 :             :     XLogRecPtr  lsn;
     115                 :             :     char       *wal_receiver_timeout;
     116                 :             : } SubOpts;
     117                 :             : 
     118                 :             : /*
     119                 :             :  * PublicationRelKind represents a relation included in a publication.
     120                 :             :  * It stores the schema-qualified relation name (rv) and its kind (relkind).
     121                 :             :  */
     122                 :             : typedef struct PublicationRelKind
     123                 :             : {
     124                 :             :     RangeVar   *rv;
     125                 :             :     char        relkind;
     126                 :             : } PublicationRelKind;
     127                 :             : 
     128                 :             : static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
     129                 :             : static void check_publications_origin_tables(WalReceiverConn *wrconn,
     130                 :             :                                              List *publications, bool copydata,
     131                 :             :                                              bool retain_dead_tuples,
     132                 :             :                                              char *origin,
     133                 :             :                                              Oid *subrel_local_oids,
     134                 :             :                                              int subrel_count, char *subname);
     135                 :             : static void check_publications_origin_sequences(WalReceiverConn *wrconn,
     136                 :             :                                                 List *publications,
     137                 :             :                                                 bool copydata, char *origin,
     138                 :             :                                                 Oid *subrel_local_oids,
     139                 :             :                                                 int subrel_count,
     140                 :             :                                                 char *subname);
     141                 :             : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
     142                 :             : static void check_duplicates_in_publist(List *publist, Datum *datums);
     143                 :             : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
     144                 :             : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
     145                 :             : static void CheckAlterSubOption(Subscription *sub, const char *option,
     146                 :             :                                 bool slot_needs_update, bool isTopLevel);
     147                 :             : static bool alter_sub_conflict_log_dest(Subscription *sub,
     148                 :             :                                         ConflictLogDest oldlogdest,
     149                 :             :                                         ConflictLogDest newlogdest,
     150                 :             :                                         Oid *conflicttablerelid);
     151                 :             : static void drop_sub_conflict_log_table(Oid subid, char *subname,
     152                 :             :                                         Oid subconflictlogrelid);
     153                 :             : 
     154                 :             : /*
     155                 :             :  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
     156                 :             :  *
     157                 :             :  * Since not all options can be specified in both commands, this function
     158                 :             :  * will report an error if mutually exclusive options are specified.
     159                 :             :  */
     160                 :             : static void
     161                 :         715 : parse_subscription_options(ParseState *pstate, List *stmt_options,
     162                 :             :                            uint32 supported_opts, SubOpts *opts)
     163                 :             : {
     164                 :             :     ListCell   *lc;
     165                 :             : 
     166                 :             :     /* Start out with cleared opts. */
     167                 :         715 :     memset(opts, 0, sizeof(SubOpts));
     168                 :             : 
     169                 :             :     /* caller must expect some option */
     170                 :             :     Assert(supported_opts != 0);
     171                 :             : 
     172                 :             :     /* If connect option is supported, these others also need to be. */
     173                 :             :     Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
     174                 :             :            IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     175                 :             :                  SUBOPT_COPY_DATA));
     176                 :             : 
     177                 :             :     /* Set default values for the supported options. */
     178         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_CONNECT))
     179                 :         337 :         opts->connect = true;
     180         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_ENABLED))
     181                 :         416 :         opts->enabled = true;
     182         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
     183                 :         337 :         opts->create_slot = true;
     184         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_COPY_DATA))
     185                 :         428 :         opts->copy_data = true;
     186         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_REFRESH))
     187                 :          54 :         opts->refresh = true;
     188         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_BINARY))
     189                 :         530 :         opts->binary = false;
     190         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_STREAMING))
     191                 :         530 :         opts->streaming = LOGICALREP_STREAM_PARALLEL;
     192         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
     193                 :         530 :         opts->twophase = false;
     194         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
     195                 :         530 :         opts->disableonerr = false;
     196         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
     197                 :         530 :         opts->passwordrequired = true;
     198         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
     199                 :         530 :         opts->runasowner = false;
     200         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_FAILOVER))
     201                 :         530 :         opts->failover = false;
     202         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     203                 :         530 :         opts->retaindeadtuples = false;
     204         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
     205                 :         530 :         opts->maxretention = 0;
     206         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_ORIGIN))
     207                 :         530 :         opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
     208         [ +  + ]:         715 :     if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST))
     209                 :         530 :         opts->conflictlogdest = CONFLICT_LOG_DEST_LOG;
     210                 :             : 
     211                 :             :     /* Parse options */
     212   [ +  +  +  +  :        1453 :     foreach(lc, stmt_options)
                   +  + ]
     213                 :             :     {
     214                 :         802 :         DefElem    *defel = (DefElem *) lfirst(lc);
     215                 :             : 
     216         [ +  + ]:         802 :         if (IsSet(supported_opts, SUBOPT_CONNECT) &&
     217         [ +  + ]:         464 :             strcmp(defel->defname, "connect") == 0)
     218                 :             :         {
     219         [ -  + ]:         174 :             if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
     220                 :           0 :                 errorConflictingDefElem(defel, pstate);
     221                 :             : 
     222                 :         174 :             opts->specified_opts |= SUBOPT_CONNECT;
     223                 :         174 :             opts->connect = defGetBoolean(defel);
     224                 :             :         }
     225         [ +  + ]:         628 :         else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
     226         [ +  + ]:         369 :                  strcmp(defel->defname, "enabled") == 0)
     227                 :             :         {
     228         [ -  + ]:         103 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     229                 :           0 :                 errorConflictingDefElem(defel, pstate);
     230                 :             : 
     231                 :         103 :             opts->specified_opts |= SUBOPT_ENABLED;
     232                 :         103 :             opts->enabled = defGetBoolean(defel);
     233                 :             :         }
     234         [ +  + ]:         525 :         else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
     235         [ +  + ]:         266 :                  strcmp(defel->defname, "create_slot") == 0)
     236                 :             :         {
     237         [ -  + ]:          25 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     238                 :           0 :                 errorConflictingDefElem(defel, pstate);
     239                 :             : 
     240                 :          25 :             opts->specified_opts |= SUBOPT_CREATE_SLOT;
     241                 :          25 :             opts->create_slot = defGetBoolean(defel);
     242                 :             :         }
     243         [ +  + ]:         500 :         else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
     244         [ +  + ]:         436 :                  strcmp(defel->defname, "slot_name") == 0)
     245                 :             :         {
     246         [ -  + ]:         146 :             if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     247                 :           0 :                 errorConflictingDefElem(defel, pstate);
     248                 :             : 
     249                 :         146 :             opts->specified_opts |= SUBOPT_SLOT_NAME;
     250                 :         146 :             opts->slot_name = defGetString(defel);
     251                 :             : 
     252                 :             :             /* Setting slot_name = NONE is treated as no slot name. */
     253         [ +  + ]:         288 :             if (strcmp(opts->slot_name, "none") == 0)
     254                 :         118 :                 opts->slot_name = NULL;
     255                 :             :             else
     256                 :          28 :                 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
     257                 :             :         }
     258         [ +  + ]:         354 :         else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
     259         [ +  + ]:         220 :                  strcmp(defel->defname, "copy_data") == 0)
     260                 :             :         {
     261         [ -  + ]:          31 :             if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     262                 :           0 :                 errorConflictingDefElem(defel, pstate);
     263                 :             : 
     264                 :          31 :             opts->specified_opts |= SUBOPT_COPY_DATA;
     265                 :          31 :             opts->copy_data = defGetBoolean(defel);
     266                 :             :         }
     267         [ +  + ]:         323 :         else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
     268         [ +  + ]:         264 :                  strcmp(defel->defname, "synchronous_commit") == 0)
     269                 :             :         {
     270         [ -  + ]:           8 :             if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
     271                 :           0 :                 errorConflictingDefElem(defel, pstate);
     272                 :             : 
     273                 :           8 :             opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
     274                 :           8 :             opts->synchronous_commit = defGetString(defel);
     275                 :             : 
     276                 :             :             /* Test if the given value is valid for synchronous_commit GUC. */
     277                 :           8 :             (void) set_config_option("synchronous_commit", opts->synchronous_commit,
     278                 :             :                                      PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     279                 :             :                                      false, 0, false);
     280                 :             :         }
     281         [ +  + ]:         315 :         else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
     282         [ +  - ]:          44 :                  strcmp(defel->defname, "refresh") == 0)
     283                 :             :         {
     284         [ -  + ]:          44 :             if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
     285                 :           0 :                 errorConflictingDefElem(defel, pstate);
     286                 :             : 
     287                 :          44 :             opts->specified_opts |= SUBOPT_REFRESH;
     288                 :          44 :             opts->refresh = defGetBoolean(defel);
     289                 :             :         }
     290         [ +  + ]:         271 :         else if (IsSet(supported_opts, SUBOPT_BINARY) &&
     291         [ +  + ]:         256 :                  strcmp(defel->defname, "binary") == 0)
     292                 :             :         {
     293         [ -  + ]:          19 :             if (IsSet(opts->specified_opts, SUBOPT_BINARY))
     294                 :           0 :                 errorConflictingDefElem(defel, pstate);
     295                 :             : 
     296                 :          19 :             opts->specified_opts |= SUBOPT_BINARY;
     297                 :          19 :             opts->binary = defGetBoolean(defel);
     298                 :             :         }
     299         [ +  + ]:         252 :         else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
     300         [ +  + ]:         237 :                  strcmp(defel->defname, "streaming") == 0)
     301                 :             :         {
     302         [ -  + ]:          43 :             if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
     303                 :           0 :                 errorConflictingDefElem(defel, pstate);
     304                 :             : 
     305                 :          43 :             opts->specified_opts |= SUBOPT_STREAMING;
     306                 :          43 :             opts->streaming = defGetStreamingMode(defel);
     307                 :             :         }
     308         [ +  + ]:         209 :         else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
     309         [ +  + ]:         194 :                  strcmp(defel->defname, "two_phase") == 0)
     310                 :             :         {
     311         [ -  + ]:          24 :             if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
     312                 :           0 :                 errorConflictingDefElem(defel, pstate);
     313                 :             : 
     314                 :          24 :             opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
     315                 :          24 :             opts->twophase = defGetBoolean(defel);
     316                 :             :         }
     317         [ +  + ]:         185 :         else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
     318         [ +  + ]:         170 :                  strcmp(defel->defname, "disable_on_error") == 0)
     319                 :             :         {
     320         [ -  + ]:          13 :             if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
     321                 :           0 :                 errorConflictingDefElem(defel, pstate);
     322                 :             : 
     323                 :          13 :             opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
     324                 :          13 :             opts->disableonerr = defGetBoolean(defel);
     325                 :             :         }
     326         [ +  + ]:         172 :         else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
     327         [ +  + ]:         157 :                  strcmp(defel->defname, "password_required") == 0)
     328                 :             :         {
     329         [ -  + ]:          16 :             if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
     330                 :           0 :                 errorConflictingDefElem(defel, pstate);
     331                 :             : 
     332                 :          16 :             opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
     333                 :          16 :             opts->passwordrequired = defGetBoolean(defel);
     334                 :             :         }
     335         [ +  + ]:         156 :         else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
     336         [ +  + ]:         141 :                  strcmp(defel->defname, "run_as_owner") == 0)
     337                 :             :         {
     338         [ -  + ]:          11 :             if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
     339                 :           0 :                 errorConflictingDefElem(defel, pstate);
     340                 :             : 
     341                 :          11 :             opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
     342                 :          11 :             opts->runasowner = defGetBoolean(defel);
     343                 :             :         }
     344         [ +  + ]:         145 :         else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
     345         [ +  + ]:         130 :                  strcmp(defel->defname, "failover") == 0)
     346                 :             :         {
     347         [ -  + ]:          16 :             if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
     348                 :           0 :                 errorConflictingDefElem(defel, pstate);
     349                 :             : 
     350                 :          16 :             opts->specified_opts |= SUBOPT_FAILOVER;
     351                 :          16 :             opts->failover = defGetBoolean(defel);
     352                 :             :         }
     353         [ +  + ]:         129 :         else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
     354         [ +  + ]:         114 :                  strcmp(defel->defname, "retain_dead_tuples") == 0)
     355                 :             :         {
     356         [ -  + ]:          14 :             if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     357                 :           0 :                 errorConflictingDefElem(defel, pstate);
     358                 :             : 
     359                 :          14 :             opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
     360                 :          14 :             opts->retaindeadtuples = defGetBoolean(defel);
     361                 :             :         }
     362         [ +  + ]:         115 :         else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
     363         [ +  + ]:         100 :                  strcmp(defel->defname, "max_retention_duration") == 0)
     364                 :             :         {
     365         [ -  + ]:          22 :             if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
     366                 :           0 :                 errorConflictingDefElem(defel, pstate);
     367                 :             : 
     368                 :          22 :             opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
     369                 :          22 :             opts->maxretention = defGetInt32(defel);
     370                 :             : 
     371         [ +  + ]:          18 :             if (opts->maxretention < 0)
     372         [ +  - ]:           8 :                 ereport(ERROR,
     373                 :             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     374                 :             :                         errmsg("max_retention_duration cannot be negative"));
     375                 :             :         }
     376         [ +  + ]:          93 :         else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
     377         [ +  + ]:          78 :                  strcmp(defel->defname, "origin") == 0)
     378                 :             :         {
     379         [ -  + ]:          24 :             if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
     380                 :           0 :                 errorConflictingDefElem(defel, pstate);
     381                 :             : 
     382                 :          24 :             opts->specified_opts |= SUBOPT_ORIGIN;
     383                 :          24 :             pfree(opts->origin);
     384                 :             : 
     385                 :             :             /*
     386                 :             :              * Even though the "origin" parameter allows only "none" and "any"
     387                 :             :              * values, it is implemented as a string type so that the
     388                 :             :              * parameter can be extended in future versions to support
     389                 :             :              * filtering using origin names specified by the user.
     390                 :             :              */
     391                 :          24 :             opts->origin = defGetString(defel);
     392                 :             : 
     393   [ +  +  +  + ]:          34 :             if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
     394                 :          10 :                 (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
     395         [ +  - ]:           4 :                 ereport(ERROR,
     396                 :             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     397                 :             :                         errmsg("unrecognized origin value: \"%s\"", opts->origin));
     398                 :             :         }
     399         [ +  + ]:          69 :         else if (IsSet(supported_opts, SUBOPT_LSN) &&
     400         [ +  - ]:          15 :                  strcmp(defel->defname, "lsn") == 0)
     401                 :          11 :         {
     402                 :          15 :             char       *lsn_str = defGetString(defel);
     403                 :             :             XLogRecPtr  lsn;
     404                 :             : 
     405         [ -  + ]:          15 :             if (IsSet(opts->specified_opts, SUBOPT_LSN))
     406                 :           0 :                 errorConflictingDefElem(defel, pstate);
     407                 :             : 
     408                 :             :             /* Setting lsn = NONE is treated as resetting LSN */
     409         [ +  + ]:          15 :             if (strcmp(lsn_str, "none") == 0)
     410                 :           4 :                 lsn = InvalidXLogRecPtr;
     411                 :             :             else
     412                 :             :             {
     413                 :             :                 /* Parse the argument as LSN */
     414                 :          11 :                 lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
     415                 :             :                                                       CStringGetDatum(lsn_str)));
     416                 :             : 
     417         [ +  + ]:          11 :                 if (!XLogRecPtrIsValid(lsn))
     418         [ +  - ]:           4 :                     ereport(ERROR,
     419                 :             :                             (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     420                 :             :                              errmsg("invalid WAL location (LSN): %s", lsn_str)));
     421                 :             :             }
     422                 :             : 
     423                 :          11 :             opts->specified_opts |= SUBOPT_LSN;
     424                 :          11 :             opts->lsn = lsn;
     425                 :             :         }
     426         [ +  - ]:          54 :         else if (IsSet(supported_opts, SUBOPT_WAL_RECEIVER_TIMEOUT) &&
     427         [ +  + ]:          54 :                  strcmp(defel->defname, "wal_receiver_timeout") == 0)
     428                 :           8 :         {
     429                 :             :             bool        parsed;
     430                 :             :             int         val;
     431                 :             : 
     432         [ -  + ]:          12 :             if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
     433                 :           0 :                 errorConflictingDefElem(defel, pstate);
     434                 :             : 
     435                 :          12 :             opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
     436                 :          12 :             opts->wal_receiver_timeout = defGetString(defel);
     437                 :             : 
     438                 :             :             /*
     439                 :             :              * Test if the given value is valid for wal_receiver_timeout GUC.
     440                 :             :              * Skip this test if the value is -1, since -1 is allowed for the
     441                 :             :              * wal_receiver_timeout subscription option, but not for the GUC
     442                 :             :              * itself.
     443                 :             :              */
     444                 :          12 :             parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
     445   [ +  +  -  + ]:          12 :             if (!parsed || val != -1)
     446                 :           8 :                 (void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
     447                 :             :                                          PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     448                 :             :                                          false, 0, false);
     449                 :             :         }
     450         [ +  - ]:          42 :         else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST) &&
     451         [ +  + ]:          42 :                  strcmp(defel->defname, "conflict_log_destination") == 0)
     452                 :          30 :         {
     453                 :             :             char       *val;
     454                 :             : 
     455         [ -  + ]:          38 :             if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_DEST))
     456                 :           0 :                 errorConflictingDefElem(defel, pstate);
     457                 :             : 
     458                 :          38 :             val = defGetString(defel);
     459                 :          38 :             opts->conflictlogdest = GetConflictLogDest(val);
     460                 :          30 :             opts->specified_opts |= SUBOPT_CONFLICT_LOG_DEST;
     461                 :             :         }
     462                 :             :         else
     463         [ +  - ]:           4 :             ereport(ERROR,
     464                 :             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     465                 :             :                      errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
     466                 :             :     }
     467                 :             : 
     468                 :             :     /*
     469                 :             :      * We've been explicitly asked to not connect, that requires some
     470                 :             :      * additional processing.
     471                 :             :      */
     472   [ +  +  +  + ]:         651 :     if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
     473                 :             :     {
     474                 :             :         /* Check for incompatible options from the user. */
     475         [ +  - ]:         134 :         if (opts->enabled &&
     476         [ +  + ]:         134 :             IsSet(opts->specified_opts, SUBOPT_ENABLED))
     477         [ +  - ]:           4 :             ereport(ERROR,
     478                 :             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     479                 :             :             /*- translator: both %s are strings of the form "option = value" */
     480                 :             :                      errmsg("%s and %s are mutually exclusive options",
     481                 :             :                             "connect = false", "enabled = true")));
     482                 :             : 
     483         [ +  + ]:         130 :         if (opts->create_slot &&
     484         [ +  + ]:         126 :             IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     485         [ +  - ]:           4 :             ereport(ERROR,
     486                 :             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     487                 :             :                      errmsg("%s and %s are mutually exclusive options",
     488                 :             :                             "connect = false", "create_slot = true")));
     489                 :             : 
     490         [ +  + ]:         126 :         if (opts->copy_data &&
     491         [ +  + ]:         122 :             IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     492         [ +  - ]:           4 :             ereport(ERROR,
     493                 :             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     494                 :             :                      errmsg("%s and %s are mutually exclusive options",
     495                 :             :                             "connect = false", "copy_data = true")));
     496                 :             : 
     497                 :             :         /* Change the defaults of other options. */
     498                 :         122 :         opts->enabled = false;
     499                 :         122 :         opts->create_slot = false;
     500                 :         122 :         opts->copy_data = false;
     501                 :             :     }
     502                 :             : 
     503                 :             :     /*
     504                 :             :      * Do additional checking for disallowed combination when slot_name = NONE
     505                 :             :      * was used.
     506                 :             :      */
     507         [ +  + ]:         639 :     if (!opts->slot_name &&
     508         [ +  + ]:         615 :         IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     509                 :             :     {
     510         [ +  + ]:         114 :         if (opts->enabled)
     511                 :             :         {
     512         [ +  + ]:          12 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     513         [ +  - ]:           4 :                 ereport(ERROR,
     514                 :             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     515                 :             :                 /*- translator: both %s are strings of the form "option = value" */
     516                 :             :                          errmsg("%s and %s are mutually exclusive options",
     517                 :             :                                 "slot_name = NONE", "enabled = true")));
     518                 :             :             else
     519         [ +  - ]:           8 :                 ereport(ERROR,
     520                 :             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     521                 :             :                 /*- translator: both %s are strings of the form "option = value" */
     522                 :             :                          errmsg("subscription with %s must also set %s",
     523                 :             :                                 "slot_name = NONE", "enabled = false")));
     524                 :             :         }
     525                 :             : 
     526         [ +  + ]:         102 :         if (opts->create_slot)
     527                 :             :         {
     528         [ +  + ]:           8 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     529         [ +  - ]:           4 :                 ereport(ERROR,
     530                 :             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     531                 :             :                 /*- translator: both %s are strings of the form "option = value" */
     532                 :             :                          errmsg("%s and %s are mutually exclusive options",
     533                 :             :                                 "slot_name = NONE", "create_slot = true")));
     534                 :             :             else
     535         [ +  - ]:           4 :                 ereport(ERROR,
     536                 :             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     537                 :             :                 /*- translator: both %s are strings of the form "option = value" */
     538                 :             :                          errmsg("subscription with %s must also set %s",
     539                 :             :                                 "slot_name = NONE", "create_slot = false")));
     540                 :             :         }
     541                 :             :     }
     542                 :         619 : }
     543                 :             : 
     544                 :             : /*
     545                 :             :  * Append a suitably-quoted identifier or string literal to buf.
     546                 :             :  * "quote" should be either a double-quote or single-quote character.
     547                 :             :  *
     548                 :             :  * Caution: this quoting logic is sufficient for identifiers and literals
     549                 :             :  * in the replication grammar, but not always in regular SQL.  Specifically,
     550                 :             :  * it'd fail for a string literal if standard_conforming_strings is off.
     551                 :             :  */
     552                 :             : static void
     553                 :         291 : appendQuotedString(StringInfo buf, const char *str, char quote)
     554                 :             : {
     555                 :         291 :     appendStringInfoChar(buf, quote);
     556         [ +  + ]:        9272 :     while (*str)
     557                 :             :     {
     558                 :        8981 :         char        c = *str++;
     559                 :             : 
     560         [ -  + ]:        8981 :         if (c == quote)
     561                 :           0 :             appendStringInfoChar(buf, c);
     562                 :        8981 :         appendStringInfoChar(buf, c);
     563                 :             :     }
     564                 :         291 :     appendStringInfoChar(buf, quote);
     565                 :         291 : }
     566                 :             : 
     567                 :             : #define appendQuotedIdentifier(b, s)    appendQuotedString(b, s, '"')
     568                 :             : #define appendQuotedLiteral(b, s)       appendQuotedString(b, s, '\'')
     569                 :             : 
     570                 :             : /*
     571                 :             :  * Check that the specified publications are present on the publisher.
     572                 :             :  */
     573                 :             : static void
     574                 :         135 : check_publications(WalReceiverConn *wrconn, List *publications)
     575                 :             : {
     576                 :             :     WalRcvExecResult *res;
     577                 :             :     StringInfoData cmd;
     578                 :             :     TupleTableSlot *slot;
     579                 :         135 :     List       *publicationsCopy = NIL;
     580                 :         135 :     Oid         tableRow[1] = {TEXTOID};
     581                 :             : 
     582                 :         135 :     initStringInfo(&cmd);
     583                 :         135 :     appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
     584                 :             :                            " pg_catalog.pg_publication t WHERE\n"
     585                 :             :                            " t.pubname IN (");
     586                 :         135 :     GetPublicationsStr(publications, &cmd, true);
     587                 :         135 :     appendStringInfoChar(&cmd, ')');
     588                 :             : 
     589                 :         135 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
     590                 :         135 :     pfree(cmd.data);
     591                 :             : 
     592         [ -  + ]:         135 :     if (res->status != WALRCV_OK_TUPLES)
     593         [ #  # ]:           0 :         ereport(ERROR,
     594                 :             :                 errmsg("could not receive list of publications from the publisher: %s",
     595                 :             :                        res->err));
     596                 :             : 
     597                 :         135 :     publicationsCopy = list_copy(publications);
     598                 :             : 
     599                 :             :     /* Process publication(s). */
     600                 :         135 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     601         [ +  + ]:         299 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     602                 :             :     {
     603                 :             :         char       *pubname;
     604                 :             :         bool        isnull;
     605                 :             : 
     606                 :         164 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     607                 :             :         Assert(!isnull);
     608                 :             : 
     609                 :             :         /* Delete the publication present in publisher from the list. */
     610                 :         164 :         publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
     611                 :         164 :         ExecClearTuple(slot);
     612                 :             :     }
     613                 :             : 
     614                 :         135 :     ExecDropSingleTupleTableSlot(slot);
     615                 :             : 
     616                 :         135 :     walrcv_clear_result(res);
     617                 :             : 
     618         [ +  + ]:         135 :     if (list_length(publicationsCopy))
     619                 :             :     {
     620                 :             :         /* Prepare the list of non-existent publication(s) for error message. */
     621                 :             :         StringInfoData pubnames;
     622                 :             : 
     623                 :           4 :         initStringInfo(&pubnames);
     624                 :             : 
     625                 :           4 :         GetPublicationsStr(publicationsCopy, &pubnames, false);
     626         [ +  - ]:           4 :         ereport(WARNING,
     627                 :             :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     628                 :             :                 errmsg_plural("publication %s does not exist on the publisher",
     629                 :             :                               "publications %s do not exist on the publisher",
     630                 :             :                               list_length(publicationsCopy),
     631                 :             :                               pubnames.data));
     632                 :             :     }
     633                 :         135 : }
     634                 :             : 
     635                 :             : /*
     636                 :             :  * Auxiliary function to build a text array out of a list of String nodes.
     637                 :             :  */
     638                 :             : static Datum
     639                 :         250 : publicationListToArray(List *publist)
     640                 :             : {
     641                 :             :     ArrayType  *arr;
     642                 :             :     Datum      *datums;
     643                 :             :     MemoryContext memcxt;
     644                 :             :     MemoryContext oldcxt;
     645                 :             : 
     646                 :             :     /* Create memory context for temporary allocations. */
     647                 :         250 :     memcxt = AllocSetContextCreate(CurrentMemoryContext,
     648                 :             :                                    "publicationListToArray to array",
     649                 :             :                                    ALLOCSET_DEFAULT_SIZES);
     650                 :         250 :     oldcxt = MemoryContextSwitchTo(memcxt);
     651                 :             : 
     652                 :         250 :     datums = palloc_array(Datum, list_length(publist));
     653                 :             : 
     654                 :         250 :     check_duplicates_in_publist(publist, datums);
     655                 :             : 
     656                 :         246 :     MemoryContextSwitchTo(oldcxt);
     657                 :             : 
     658                 :         246 :     arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
     659                 :             : 
     660                 :         246 :     MemoryContextDelete(memcxt);
     661                 :             : 
     662                 :         246 :     return PointerGetDatum(arr);
     663                 :             : }
     664                 :             : 
     665                 :             : /*
     666                 :             :  * Create new subscription.
     667                 :             :  */
     668                 :             : ObjectAddress
     669                 :         337 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
     670                 :             :                    bool isTopLevel)
     671                 :             : {
     672                 :             :     Relation    rel;
     673                 :             :     ObjectAddress myself;
     674                 :             :     Oid         subid;
     675                 :             :     bool        nulls[Natts_pg_subscription];
     676                 :             :     Datum       values[Natts_pg_subscription];
     677                 :         337 :     Oid         owner = GetUserId();
     678                 :             :     HeapTuple   tup;
     679                 :             :     Oid         serverid;
     680                 :             :     char       *conninfo;
     681                 :             :     char        originname[NAMEDATALEN];
     682                 :             :     List       *publications;
     683                 :             :     uint32      supported_opts;
     684                 :         337 :     SubOpts     opts = {0};
     685                 :             :     AclResult   aclresult;
     686                 :         337 :     Oid         logrelid = InvalidOid;
     687                 :             : 
     688                 :             :     /*
     689                 :             :      * Parse and check options.
     690                 :             :      *
     691                 :             :      * Connection and publication should not be specified here.
     692                 :             :      */
     693                 :         337 :     supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     694                 :             :                       SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
     695                 :             :                       SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
     696                 :             :                       SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
     697                 :             :                       SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
     698                 :             :                       SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
     699                 :             :                       SUBOPT_RETAIN_DEAD_TUPLES |
     700                 :             :                       SUBOPT_MAX_RETENTION_DURATION |
     701                 :             :                       SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN |
     702                 :             :                       SUBOPT_CONFLICT_LOG_DEST);
     703                 :         337 :     parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
     704                 :             : 
     705                 :             :     /*
     706                 :             :      * Since creating a replication slot is not transactional, rolling back
     707                 :             :      * the transaction leaves the created replication slot.  So we cannot run
     708                 :             :      * CREATE SUBSCRIPTION inside a transaction block if creating a
     709                 :             :      * replication slot.
     710                 :             :      */
     711         [ +  + ]:         265 :     if (opts.create_slot)
     712                 :         138 :         PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
     713                 :             : 
     714                 :             :     /*
     715                 :             :      * We don't want to allow unprivileged users to be able to trigger
     716                 :             :      * attempts to access arbitrary network destinations, so require the user
     717                 :             :      * to have been specifically authorized to create subscriptions.
     718                 :             :      */
     719         [ +  + ]:         261 :     if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
     720         [ +  - ]:           4 :         ereport(ERROR,
     721                 :             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     722                 :             :                  errmsg("permission denied to create subscription"),
     723                 :             :                  errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
     724                 :             :                            "pg_create_subscription")));
     725                 :             : 
     726                 :             :     /*
     727                 :             :      * Since a subscription is a database object, we also check for CREATE
     728                 :             :      * permission on the database.
     729                 :             :      */
     730                 :         257 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
     731                 :             :                                 owner, ACL_CREATE);
     732         [ +  + ]:         257 :     if (aclresult != ACLCHECK_OK)
     733                 :           8 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     734                 :           4 :                        get_database_name(MyDatabaseId));
     735                 :             : 
     736                 :             :     /*
     737                 :             :      * Non-superusers are required to set a password for authentication, and
     738                 :             :      * that password must be used by the target server, but the superuser can
     739                 :             :      * exempt a subscription from this requirement.
     740                 :             :      */
     741   [ +  +  +  + ]:         253 :     if (!opts.passwordrequired && !superuser_arg(owner))
     742         [ +  - ]:           4 :         ereport(ERROR,
     743                 :             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     744                 :             :                  errmsg("password_required=false is superuser-only"),
     745                 :             :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
     746                 :             : 
     747                 :             :     /*
     748                 :             :      * If built with appropriate switch, whine when regression-testing
     749                 :             :      * conventions for subscription names are violated.
     750                 :             :      */
     751                 :             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
     752                 :             :     if (strncmp(stmt->subname, "regress_", 8) != 0)
     753                 :             :         elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
     754                 :             : #endif
     755                 :             : 
     756                 :         249 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     757                 :             : 
     758                 :             :     /* Check if name is used */
     759                 :         249 :     subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
     760                 :             :                             ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
     761         [ +  + ]:         249 :     if (OidIsValid(subid))
     762                 :             :     {
     763         [ +  - ]:           5 :         ereport(ERROR,
     764                 :             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     765                 :             :                  errmsg("subscription \"%s\" already exists",
     766                 :             :                         stmt->subname)));
     767                 :             :     }
     768                 :             : 
     769                 :             :     /*
     770                 :             :      * Ensure that system configuration parameters are set appropriately to
     771                 :             :      * support retain_dead_tuples and max_retention_duration.
     772                 :             :      */
     773                 :         244 :     CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
     774                 :         244 :                                opts.retaindeadtuples, opts.retaindeadtuples,
     775                 :         244 :                                (opts.maxretention > 0));
     776                 :             : 
     777         [ +  + ]:         244 :     if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
     778         [ +  - ]:         198 :         opts.slot_name == NULL)
     779                 :         198 :         opts.slot_name = stmt->subname;
     780                 :             : 
     781                 :             :     /* The default for synchronous_commit of subscriptions is off. */
     782         [ +  - ]:         244 :     if (opts.synchronous_commit == NULL)
     783                 :         244 :         opts.synchronous_commit = "off";
     784                 :             : 
     785                 :             :     /*
     786                 :             :      * The default for wal_receiver_timeout of subscriptions is -1, which
     787                 :             :      * means the value is inherited from the server configuration, command
     788                 :             :      * line, or role/database settings.
     789                 :             :      */
     790         [ +  - ]:         244 :     if (opts.wal_receiver_timeout == NULL)
     791                 :         244 :         opts.wal_receiver_timeout = "-1";
     792                 :             : 
     793                 :             :     /* Load the library providing us libpq calls. */
     794                 :         244 :     load_file("libpqwalreceiver", false);
     795                 :             : 
     796         [ +  + ]:         244 :     if (stmt->servername)
     797                 :             :     {
     798                 :             :         ForeignServer *server;
     799                 :             : 
     800                 :             :         Assert(!stmt->conninfo);
     801                 :          22 :         conninfo = NULL;
     802                 :             : 
     803                 :          22 :         server = GetForeignServerByName(stmt->servername, false);
     804                 :          22 :         aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
     805         [ +  + ]:          22 :         if (aclresult != ACLCHECK_OK)
     806                 :           4 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
     807                 :             : 
     808                 :             :         /* make sure a user mapping exists */
     809                 :          18 :         GetUserMapping(owner, server->serverid);
     810                 :             : 
     811                 :          14 :         serverid = server->serverid;
     812                 :          14 :         conninfo = ForeignServerConnectionString(owner, server);
     813                 :             :     }
     814                 :             :     else
     815                 :             :     {
     816                 :             :         Assert(stmt->conninfo);
     817                 :             : 
     818                 :         222 :         serverid = InvalidOid;
     819                 :         222 :         conninfo = stmt->conninfo;
     820                 :             :     }
     821                 :             : 
     822                 :             :     /* Check the connection info string. */
     823   [ +  +  +  + ]:         232 :     walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
     824                 :             : 
     825                 :         220 :     publications = stmt->publication;
     826                 :             : 
     827                 :             :     /* Everything ok, form a new tuple. */
     828                 :         220 :     memset(values, 0, sizeof(values));
     829                 :         220 :     memset(nulls, false, sizeof(nulls));
     830                 :             : 
     831                 :         220 :     subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
     832                 :             :                                Anum_pg_subscription_oid);
     833                 :         220 :     values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
     834                 :         220 :     values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
     835                 :         220 :     values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
     836                 :         220 :     values[Anum_pg_subscription_subname - 1] =
     837                 :         220 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
     838                 :         220 :     values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
     839                 :         220 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
     840                 :         220 :     values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
     841                 :         220 :     values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
     842                 :         220 :     values[Anum_pg_subscription_subtwophasestate - 1] =
     843         [ +  + ]:         220 :         CharGetDatum(opts.twophase ?
     844                 :             :                      LOGICALREP_TWOPHASE_STATE_PENDING :
     845                 :             :                      LOGICALREP_TWOPHASE_STATE_DISABLED);
     846                 :         220 :     values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
     847                 :         220 :     values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
     848                 :         220 :     values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
     849                 :         220 :     values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
     850                 :         220 :     values[Anum_pg_subscription_subretaindeadtuples - 1] =
     851                 :         220 :         BoolGetDatum(opts.retaindeadtuples);
     852                 :         220 :     values[Anum_pg_subscription_submaxretention - 1] =
     853                 :         220 :         Int32GetDatum(opts.maxretention);
     854                 :         220 :     values[Anum_pg_subscription_subretentionactive - 1] =
     855                 :         220 :         BoolGetDatum(opts.retaindeadtuples);
     856                 :         220 :     values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(serverid);
     857         [ +  + ]:         220 :     if (!OidIsValid(serverid))
     858                 :         210 :         values[Anum_pg_subscription_subconninfo - 1] =
     859                 :         210 :             CStringGetTextDatum(conninfo);
     860                 :             :     else
     861                 :          10 :         nulls[Anum_pg_subscription_subconninfo - 1] = true;
     862         [ +  + ]:         220 :     if (opts.slot_name)
     863                 :         206 :         values[Anum_pg_subscription_subslotname - 1] =
     864                 :         206 :             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
     865                 :             :     else
     866                 :          14 :         nulls[Anum_pg_subscription_subslotname - 1] = true;
     867                 :         220 :     values[Anum_pg_subscription_subsynccommit - 1] =
     868                 :         220 :         CStringGetTextDatum(opts.synchronous_commit);
     869                 :         220 :     values[Anum_pg_subscription_subwalrcvtimeout - 1] =
     870                 :         220 :         CStringGetTextDatum(opts.wal_receiver_timeout);
     871                 :         216 :     values[Anum_pg_subscription_subpublications - 1] =
     872                 :         220 :         publicationListToArray(publications);
     873                 :         216 :     values[Anum_pg_subscription_suborigin - 1] =
     874                 :         216 :         CStringGetTextDatum(opts.origin);
     875                 :             : 
     876                 :         216 :     values[Anum_pg_subscription_subconflictlogdest - 1] =
     877                 :         216 :         CStringGetTextDatum(ConflictLogDestNames[opts.conflictlogdest]);
     878                 :             : 
     879                 :             :     /*
     880                 :             :      * We create the conflict log table here, if required, so that its
     881                 :             :      * relation OID can be stored when inserting the pg_subscription tuple
     882                 :             :      * below.
     883                 :             :      */
     884   [ +  +  +  + ]:         216 :     if (CONFLICTS_LOGGED_TO_TABLE(opts.conflictlogdest))
     885                 :          10 :         logrelid = create_conflict_log_table(subid, stmt->subname, owner);
     886                 :             : 
     887                 :             :     /* Store table OID in the catalog. */
     888                 :         216 :     values[Anum_pg_subscription_subconflictlogrelid - 1] =
     889                 :         216 :         ObjectIdGetDatum(logrelid);
     890                 :             : 
     891                 :         216 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     892                 :             : 
     893                 :             :     /* Insert tuple into catalog. */
     894                 :         216 :     CatalogTupleInsert(rel, tup);
     895                 :         216 :     heap_freetuple(tup);
     896                 :             : 
     897                 :         216 :     recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
     898                 :             : 
     899                 :         216 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     900                 :             : 
     901         [ +  + ]:         216 :     if (stmt->servername)
     902                 :             :     {
     903                 :             :         ObjectAddress referenced;
     904                 :             : 
     905                 :             :         Assert(OidIsValid(serverid));
     906                 :             : 
     907                 :          10 :         ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
     908                 :          10 :         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
     909                 :             :     }
     910                 :             : 
     911                 :             :     /*
     912                 :             :      * Establish an internal dependency between the conflict log table and the
     913                 :             :      * subscription.
     914                 :             :      *
     915                 :             :      * We use DEPENDENCY_INTERNAL to signify that the table's lifecycle is
     916                 :             :      * strictly tied to the subscription, similar to how a TOAST table relates
     917                 :             :      * to its main table or a sequence relates to an identity column.
     918                 :             :      *
     919                 :             :      * This ensures the conflict log table is automatically reaped during a
     920                 :             :      * DROP SUBSCRIPTION via performDeletion().
     921                 :             :      */
     922         [ +  + ]:         216 :     if (OidIsValid(logrelid))
     923                 :             :     {
     924                 :             :         ObjectAddress cltaddr;
     925                 :             : 
     926                 :          10 :         ObjectAddressSet(cltaddr, RelationRelationId, logrelid);
     927                 :          10 :         recordDependencyOn(&cltaddr, &myself, DEPENDENCY_INTERNAL);
     928                 :             :     }
     929                 :             : 
     930                 :             :     /*
     931                 :             :      * A replication origin is currently created for all subscriptions,
     932                 :             :      * including those that only contain sequences or are otherwise empty.
     933                 :             :      *
     934                 :             :      * XXX: While this is technically unnecessary, optimizing it would require
     935                 :             :      * additional logic to skip origin creation during DDL operations and
     936                 :             :      * apply workers initialization, and to handle origin creation dynamically
     937                 :             :      * when tables are added to the subscription. It is not clear whether
     938                 :             :      * preventing creation of origins is worth additional complexity.
     939                 :             :      */
     940                 :         216 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
     941                 :         216 :     replorigin_create(originname);
     942                 :             : 
     943                 :             :     /*
     944                 :             :      * Connect to remote side to execute requested commands and fetch table
     945                 :             :      * and sequence info.
     946                 :             :      */
     947         [ +  + ]:         216 :     if (opts.connect)
     948                 :             :     {
     949                 :             :         char       *err;
     950                 :             :         WalReceiverConn *wrconn;
     951                 :             :         bool        must_use_password;
     952                 :             : 
     953                 :             :         /* Try to connect to the publisher. */
     954   [ -  +  -  - ]:         130 :         must_use_password = !superuser_arg(owner) && opts.passwordrequired;
     955                 :         130 :         wrconn = walrcv_connect(conninfo, true, true, must_use_password,
     956                 :             :                                 stmt->subname, &err);
     957         [ +  + ]:         130 :         if (!wrconn)
     958         [ +  - ]:           4 :             ereport(ERROR,
     959                 :             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     960                 :             :                      errmsg("subscription \"%s\" could not connect to the publisher: %s",
     961                 :             :                             stmt->subname, err)));
     962                 :             : 
     963         [ +  + ]:         126 :         PG_TRY();
     964                 :             :         {
     965                 :         126 :             bool        has_tables = false;
     966                 :             :             List       *pubrels;
     967                 :             :             char        relation_state;
     968                 :             : 
     969                 :         126 :             check_publications(wrconn, publications);
     970                 :         126 :             check_publications_origin_tables(wrconn, publications,
     971                 :         126 :                                              opts.copy_data,
     972                 :         126 :                                              opts.retaindeadtuples, opts.origin,
     973                 :             :                                              NULL, 0, stmt->subname);
     974                 :         126 :             check_publications_origin_sequences(wrconn, publications,
     975                 :         126 :                                                 opts.copy_data, opts.origin,
     976                 :             :                                                 NULL, 0, stmt->subname);
     977                 :             : 
     978         [ +  + ]:         126 :             if (opts.retaindeadtuples)
     979                 :           3 :                 check_pub_dead_tuple_retention(wrconn);
     980                 :             : 
     981                 :             :             /*
     982                 :             :              * Set sync state based on if we were asked to do data copy or
     983                 :             :              * not.
     984                 :             :              */
     985         [ +  + ]:         126 :             relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
     986                 :             : 
     987                 :             :             /*
     988                 :             :              * Build local relation status info. Relations are for both tables
     989                 :             :              * and sequences from the publisher.
     990                 :             :              */
     991                 :         126 :             pubrels = fetch_relation_list(wrconn, publications);
     992                 :             : 
     993   [ +  +  +  +  :         445 :             foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
                   +  + ]
     994                 :             :             {
     995                 :             :                 Oid         relid;
     996                 :             :                 char        relkind;
     997                 :         195 :                 RangeVar   *rv = pubrelinfo->rv;
     998                 :             : 
     999                 :         195 :                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
    1000                 :         195 :                 relkind = get_rel_relkind(relid);
    1001                 :             : 
    1002                 :             :                 /* Check for supported relkind. */
    1003                 :         195 :                 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
    1004                 :         195 :                                          rv->schemaname, rv->relname);
    1005                 :         195 :                 has_tables |= (relkind != RELKIND_SEQUENCE);
    1006                 :         195 :                 AddSubscriptionRelState(subid, relid, relation_state,
    1007                 :             :                                         InvalidXLogRecPtr, true);
    1008                 :             :             }
    1009                 :             : 
    1010                 :             :             /*
    1011                 :             :              * If requested, create permanent slot for the subscription. We
    1012                 :             :              * won't use the initial snapshot for anything, so no need to
    1013                 :             :              * export it.
    1014                 :             :              *
    1015                 :             :              * XXX: Similar to origins, it is not clear whether preventing the
    1016                 :             :              * slot creation for empty and sequence-only subscriptions is
    1017                 :             :              * worth additional complexity.
    1018                 :             :              */
    1019         [ +  + ]:         125 :             if (opts.create_slot)
    1020                 :             :             {
    1021                 :         120 :                 bool        twophase_enabled = false;
    1022                 :             : 
    1023                 :             :                 Assert(opts.slot_name);
    1024                 :             : 
    1025                 :             :                 /*
    1026                 :             :                  * Even if two_phase is set, don't create the slot with
    1027                 :             :                  * two-phase enabled. Will enable it once all the tables are
    1028                 :             :                  * synced and ready. This avoids race-conditions like prepared
    1029                 :             :                  * transactions being skipped due to changes not being applied
    1030                 :             :                  * due to checks in should_apply_changes_for_rel() when
    1031                 :             :                  * tablesync for the corresponding tables are in progress. See
    1032                 :             :                  * comments atop worker.c.
    1033                 :             :                  *
    1034                 :             :                  * Note that if tables were specified but copy_data is false
    1035                 :             :                  * then it is safe to enable two_phase up-front because those
    1036                 :             :                  * tables are already initially in READY state. When the
    1037                 :             :                  * subscription has no tables, we leave the twophase state as
    1038                 :             :                  * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
    1039                 :             :                  * PUBLICATION to work.
    1040                 :             :                  */
    1041   [ +  +  +  +  :         120 :                 if (opts.twophase && !opts.copy_data && has_tables)
                   +  - ]
    1042                 :           1 :                     twophase_enabled = true;
    1043                 :             : 
    1044                 :         120 :                 walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
    1045                 :             :                                    opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
    1046                 :             : 
    1047         [ +  + ]:         120 :                 if (twophase_enabled)
    1048                 :           1 :                     UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
    1049                 :             : 
    1050         [ +  - ]:         120 :                 ereport(NOTICE,
    1051                 :             :                         (errmsg("created replication slot \"%s\" on publisher",
    1052                 :             :                                 opts.slot_name)));
    1053                 :             :             }
    1054                 :             :         }
    1055                 :           1 :         PG_FINALLY();
    1056                 :             :         {
    1057                 :         126 :             walrcv_disconnect(wrconn);
    1058                 :             :         }
    1059         [ +  + ]:         126 :         PG_END_TRY();
    1060                 :             :     }
    1061                 :             :     else
    1062         [ +  - ]:          86 :         ereport(WARNING,
    1063                 :             :                 (errmsg("subscription was created, but is not connected"),
    1064                 :             :                  errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
    1065                 :             : 
    1066                 :         211 :     table_close(rel, RowExclusiveLock);
    1067                 :             : 
    1068                 :         211 :     pgstat_create_subscription(subid);
    1069                 :             : 
    1070                 :             :     /*
    1071                 :             :      * Notify the launcher to start the apply worker if the subscription is
    1072                 :             :      * enabled, or to create the conflict detection slot if retain_dead_tuples
    1073                 :             :      * is enabled.
    1074                 :             :      *
    1075                 :             :      * Creating the conflict detection slot is essential even when the
    1076                 :             :      * subscription is not enabled. This ensures that dead tuples are
    1077                 :             :      * retained, which is necessary for accurately identifying the type of
    1078                 :             :      * conflict during replication.
    1079                 :             :      */
    1080   [ +  +  +  + ]:         211 :     if (opts.enabled || opts.retaindeadtuples)
    1081                 :         119 :         ApplyLauncherWakeupAtCommit();
    1082                 :             : 
    1083         [ -  + ]:         211 :     InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
    1084                 :             : 
    1085                 :         211 :     return myself;
    1086                 :             : }
    1087                 :             : 
    1088                 :             : static void
    1089                 :          39 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
    1090                 :             :                           List *validate_publications)
    1091                 :             : {
    1092                 :             :     char       *err;
    1093                 :          39 :     List       *pubrels = NIL;
    1094                 :             :     Oid        *pubrel_local_oids;
    1095                 :             :     List       *subrel_states;
    1096                 :          39 :     List       *sub_remove_rels = NIL;
    1097                 :             :     Oid        *subrel_local_oids;
    1098                 :             :     Oid        *subseq_local_oids;
    1099                 :             :     int         subrel_count;
    1100                 :             :     ListCell   *lc;
    1101                 :             :     int         off;
    1102                 :          39 :     int         tbl_count = 0;
    1103                 :          39 :     int         seq_count = 0;
    1104                 :          39 :     Relation    rel = NULL;
    1105                 :             :     typedef struct SubRemoveRels
    1106                 :             :     {
    1107                 :             :         Oid         relid;
    1108                 :             :         char        state;
    1109                 :             :     } SubRemoveRels;
    1110                 :             : 
    1111                 :             :     WalReceiverConn *wrconn;
    1112                 :             :     bool        must_use_password;
    1113                 :             : 
    1114                 :             :     /* Load the library providing us libpq calls. */
    1115                 :          39 :     load_file("libpqwalreceiver", false);
    1116                 :             : 
    1117                 :             :     /* Try to connect to the publisher. */
    1118   [ +  -  +  + ]:          39 :     must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1119                 :          39 :     wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
    1120                 :             :                             sub->name, &err);
    1121         [ -  + ]:          38 :     if (!wrconn)
    1122         [ #  # ]:           0 :         ereport(ERROR,
    1123                 :             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1124                 :             :                  errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1125                 :             :                         sub->name, err)));
    1126                 :             : 
    1127         [ +  - ]:          38 :     PG_TRY();
    1128                 :             :     {
    1129         [ +  + ]:          38 :         if (validate_publications)
    1130                 :           9 :             check_publications(wrconn, validate_publications);
    1131                 :             : 
    1132                 :             :         /* Get the relation list from publisher. */
    1133                 :          38 :         pubrels = fetch_relation_list(wrconn, sub->publications);
    1134                 :             : 
    1135                 :             :         /* Get local relation list. */
    1136                 :          38 :         subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
    1137                 :          38 :         subrel_count = list_length(subrel_states);
    1138                 :             : 
    1139                 :             :         /*
    1140                 :             :          * Build qsorted arrays of local table oids and sequence oids for
    1141                 :             :          * faster lookup. This can potentially contain all tables and
    1142                 :             :          * sequences in the database so speed of lookup is important.
    1143                 :             :          *
    1144                 :             :          * We do not yet know the exact count of tables and sequences, so we
    1145                 :             :          * allocate separate arrays for table OIDs and sequence OIDs based on
    1146                 :             :          * the total number of relations (subrel_count).
    1147                 :             :          */
    1148                 :          38 :         subrel_local_oids = palloc(subrel_count * sizeof(Oid));
    1149                 :          38 :         subseq_local_oids = palloc(subrel_count * sizeof(Oid));
    1150   [ +  +  +  +  :         136 :         foreach(lc, subrel_states)
                   +  + ]
    1151                 :             :         {
    1152                 :          98 :             SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
    1153                 :             : 
    1154         [ +  + ]:          98 :             if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
    1155                 :           9 :                 subseq_local_oids[seq_count++] = relstate->relid;
    1156                 :             :             else
    1157                 :          89 :                 subrel_local_oids[tbl_count++] = relstate->relid;
    1158                 :             :         }
    1159                 :             : 
    1160                 :          38 :         qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
    1161                 :          38 :         check_publications_origin_tables(wrconn, sub->publications, copy_data,
    1162                 :          38 :                                          sub->retaindeadtuples, sub->origin,
    1163                 :             :                                          subrel_local_oids, tbl_count,
    1164                 :             :                                          sub->name);
    1165                 :             : 
    1166                 :          38 :         qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
    1167                 :          38 :         check_publications_origin_sequences(wrconn, sub->publications,
    1168                 :             :                                             copy_data, sub->origin,
    1169                 :             :                                             subseq_local_oids, seq_count,
    1170                 :             :                                             sub->name);
    1171                 :             : 
    1172                 :             :         /*
    1173                 :             :          * Walk over the remote relations and try to match them to locally
    1174                 :             :          * known relations. If the relation is not known locally create a new
    1175                 :             :          * state for it.
    1176                 :             :          *
    1177                 :             :          * Also builds array of local oids of remote relations for the next
    1178                 :             :          * step.
    1179                 :             :          */
    1180                 :          38 :         off = 0;
    1181                 :          38 :         pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
    1182                 :             : 
    1183   [ +  +  +  +  :         183 :         foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
                   +  + ]
    1184                 :             :         {
    1185                 :         107 :             RangeVar   *rv = pubrelinfo->rv;
    1186                 :             :             Oid         relid;
    1187                 :             :             char        relkind;
    1188                 :             : 
    1189                 :         107 :             relid = RangeVarGetRelid(rv, AccessShareLock, false);
    1190                 :         107 :             relkind = get_rel_relkind(relid);
    1191                 :             : 
    1192                 :             :             /* Check for supported relkind. */
    1193                 :         107 :             CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
    1194                 :         107 :                                      rv->schemaname, rv->relname);
    1195                 :             : 
    1196                 :         107 :             pubrel_local_oids[off++] = relid;
    1197                 :             : 
    1198         [ +  + ]:         107 :             if (!bsearch(&relid, subrel_local_oids,
    1199                 :          39 :                          tbl_count, sizeof(Oid), oid_cmp) &&
    1200         [ +  + ]:          39 :                 !bsearch(&relid, subseq_local_oids,
    1201                 :             :                          seq_count, sizeof(Oid), oid_cmp))
    1202                 :             :             {
    1203         [ +  + ]:          30 :                 AddSubscriptionRelState(sub->oid, relid,
    1204                 :             :                                         copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
    1205                 :             :                                         InvalidXLogRecPtr, true);
    1206   [ +  +  -  + ]:          30 :                 ereport(DEBUG1,
    1207                 :             :                         errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
    1208                 :             :                                         relkind == RELKIND_SEQUENCE ? "sequence" : "table",
    1209                 :             :                                         rv->schemaname, rv->relname, sub->name));
    1210                 :             :             }
    1211                 :             :         }
    1212                 :             : 
    1213                 :             :         /*
    1214                 :             :          * Next remove state for tables we should not care about anymore using
    1215                 :             :          * the data we collected above
    1216                 :             :          */
    1217                 :          38 :         qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
    1218                 :             : 
    1219         [ +  + ]:         127 :         for (off = 0; off < tbl_count; off++)
    1220                 :             :         {
    1221                 :          89 :             Oid         relid = subrel_local_oids[off];
    1222                 :             : 
    1223         [ +  + ]:          89 :             if (!bsearch(&relid, pubrel_local_oids,
    1224                 :          89 :                          list_length(pubrels), sizeof(Oid), oid_cmp))
    1225                 :             :             {
    1226                 :             :                 char        state;
    1227                 :             :                 XLogRecPtr  statelsn;
    1228                 :          21 :                 SubRemoveRels *remove_rel = palloc_object(SubRemoveRels);
    1229                 :             : 
    1230                 :             :                 /*
    1231                 :             :                  * Lock pg_subscription_rel with AccessExclusiveLock to
    1232                 :             :                  * prevent any race conditions with the apply worker
    1233                 :             :                  * re-launching workers at the same time this code is trying
    1234                 :             :                  * to remove those tables.
    1235                 :             :                  *
    1236                 :             :                  * Even if new worker for this particular rel is restarted it
    1237                 :             :                  * won't be able to make any progress as we hold exclusive
    1238                 :             :                  * lock on pg_subscription_rel till the transaction end. It
    1239                 :             :                  * will simply exit as there is no corresponding rel entry.
    1240                 :             :                  *
    1241                 :             :                  * This locking also ensures that the state of rels won't
    1242                 :             :                  * change till we are done with this refresh operation.
    1243                 :             :                  */
    1244         [ +  + ]:          21 :                 if (!rel)
    1245                 :           9 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1246                 :             : 
    1247                 :             :                 /* Last known rel state. */
    1248                 :          21 :                 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
    1249                 :             : 
    1250                 :          21 :                 RemoveSubscriptionRel(sub->oid, relid);
    1251                 :             : 
    1252                 :          21 :                 remove_rel->relid = relid;
    1253                 :          21 :                 remove_rel->state = state;
    1254                 :             : 
    1255                 :          21 :                 sub_remove_rels = lappend(sub_remove_rels, remove_rel);
    1256                 :             : 
    1257                 :          21 :                 logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
    1258                 :             : 
    1259                 :             :                 /*
    1260                 :             :                  * For READY state, we would have already dropped the
    1261                 :             :                  * tablesync origin.
    1262                 :             :                  */
    1263         [ -  + ]:          21 :                 if (state != SUBREL_STATE_READY)
    1264                 :             :                 {
    1265                 :             :                     char        originname[NAMEDATALEN];
    1266                 :             : 
    1267                 :             :                     /*
    1268                 :             :                      * Drop the tablesync's origin tracking if exists.
    1269                 :             :                      *
    1270                 :             :                      * It is possible that the origin is not yet created for
    1271                 :             :                      * tablesync worker, this can happen for the states before
    1272                 :             :                      * SUBREL_STATE_DATASYNC. The tablesync worker or apply
    1273                 :             :                      * worker can also concurrently try to drop the origin and
    1274                 :             :                      * by this time the origin might be already removed. For
    1275                 :             :                      * these reasons, passing missing_ok = true.
    1276                 :             :                      */
    1277                 :           0 :                     ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
    1278                 :             :                                                        sizeof(originname));
    1279                 :           0 :                     replorigin_drop_by_name(originname, true, false);
    1280                 :             :                 }
    1281                 :             : 
    1282         [ +  + ]:          21 :                 ereport(DEBUG1,
    1283                 :             :                         (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
    1284                 :             :                                          get_namespace_name(get_rel_namespace(relid)),
    1285                 :             :                                          get_rel_name(relid),
    1286                 :             :                                          sub->name)));
    1287                 :             :             }
    1288                 :             :         }
    1289                 :             : 
    1290                 :             :         /*
    1291                 :             :          * Drop the tablesync slots associated with removed tables. This has
    1292                 :             :          * to be at the end because otherwise if there is an error while doing
    1293                 :             :          * the database operations we won't be able to rollback dropped slots.
    1294                 :             :          */
    1295   [ +  +  +  +  :          97 :         foreach_ptr(SubRemoveRels, sub_remove_rel, sub_remove_rels)
                   +  + ]
    1296                 :             :         {
    1297         [ -  + ]:          21 :             if (sub_remove_rel->state != SUBREL_STATE_READY &&
    1298         [ #  # ]:           0 :                 sub_remove_rel->state != SUBREL_STATE_SYNCDONE)
    1299                 :             :             {
    1300                 :           0 :                 char        syncslotname[NAMEDATALEN] = {0};
    1301                 :             : 
    1302                 :             :                 /*
    1303                 :             :                  * For READY/SYNCDONE states we know the tablesync slot has
    1304                 :             :                  * already been dropped by the tablesync worker.
    1305                 :             :                  *
    1306                 :             :                  * For other states, there is no certainty, maybe the slot
    1307                 :             :                  * does not exist yet. Also, if we fail after removing some of
    1308                 :             :                  * the slots, next time, it will again try to drop already
    1309                 :             :                  * dropped slots and fail. For these reasons, we allow
    1310                 :             :                  * missing_ok = true for the drop.
    1311                 :             :                  */
    1312                 :           0 :                 ReplicationSlotNameForTablesync(sub->oid, sub_remove_rel->relid,
    1313                 :             :                                                 syncslotname, sizeof(syncslotname));
    1314                 :           0 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    1315                 :             :             }
    1316                 :             :         }
    1317                 :             : 
    1318                 :             :         /*
    1319                 :             :          * Next remove state for sequences we should not care about anymore
    1320                 :             :          * using the data we collected above
    1321                 :             :          */
    1322         [ +  + ]:          47 :         for (off = 0; off < seq_count; off++)
    1323                 :             :         {
    1324                 :           9 :             Oid         relid = subseq_local_oids[off];
    1325                 :             : 
    1326         [ -  + ]:           9 :             if (!bsearch(&relid, pubrel_local_oids,
    1327                 :           9 :                          list_length(pubrels), sizeof(Oid), oid_cmp))
    1328                 :             :             {
    1329                 :             :                 /*
    1330                 :             :                  * This locking ensures that the state of rels won't change
    1331                 :             :                  * till we are done with this refresh operation.
    1332                 :             :                  */
    1333         [ #  # ]:           0 :                 if (!rel)
    1334                 :           0 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1335                 :             : 
    1336                 :           0 :                 RemoveSubscriptionRel(sub->oid, relid);
    1337                 :             : 
    1338         [ #  # ]:           0 :                 ereport(DEBUG1,
    1339                 :             :                         errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
    1340                 :             :                                         get_namespace_name(get_rel_namespace(relid)),
    1341                 :             :                                         get_rel_name(relid),
    1342                 :             :                                         sub->name));
    1343                 :             :             }
    1344                 :             :         }
    1345                 :             :     }
    1346                 :           0 :     PG_FINALLY();
    1347                 :             :     {
    1348                 :          38 :         walrcv_disconnect(wrconn);
    1349                 :             :     }
    1350         [ -  + ]:          38 :     PG_END_TRY();
    1351                 :             : 
    1352         [ +  + ]:          38 :     if (rel)
    1353                 :           9 :         table_close(rel, NoLock);
    1354                 :          38 : }
    1355                 :             : 
    1356                 :             : /*
    1357                 :             :  * Marks all sequences with INIT state.
    1358                 :             :  */
    1359                 :             : static void
    1360                 :           3 : AlterSubscription_refresh_seq(Subscription *sub)
    1361                 :             : {
    1362                 :           3 :     char       *err = NULL;
    1363                 :             :     WalReceiverConn *wrconn;
    1364                 :             :     bool        must_use_password;
    1365                 :             : 
    1366                 :             :     /* Load the library providing us libpq calls. */
    1367                 :           3 :     load_file("libpqwalreceiver", false);
    1368                 :             : 
    1369                 :             :     /* Try to connect to the publisher. */
    1370   [ +  -  -  + ]:           3 :     must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1371                 :           3 :     wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
    1372                 :             :                             sub->name, &err);
    1373         [ -  + ]:           3 :     if (!wrconn)
    1374         [ #  # ]:           0 :         ereport(ERROR,
    1375                 :             :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1376                 :             :                 errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1377                 :             :                        sub->name, err));
    1378                 :             : 
    1379         [ +  - ]:           3 :     PG_TRY();
    1380                 :             :     {
    1381                 :             :         List       *subrel_states;
    1382                 :             : 
    1383                 :           3 :         check_publications_origin_sequences(wrconn, sub->publications, true,
    1384                 :             :                                             sub->origin, NULL, 0, sub->name);
    1385                 :             : 
    1386                 :             :         /* Get local sequence list. */
    1387                 :           3 :         subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
    1388   [ +  -  +  +  :          19 :         foreach_ptr(SubscriptionRelState, subrel, subrel_states)
                   +  + ]
    1389                 :             :         {
    1390                 :          13 :             Oid         relid = subrel->relid;
    1391                 :             : 
    1392                 :          13 :             UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
    1393                 :             :                                        InvalidXLogRecPtr, false);
    1394         [ -  + ]:          13 :             ereport(DEBUG1,
    1395                 :             :                     errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
    1396                 :             :                                     get_namespace_name(get_rel_namespace(relid)),
    1397                 :             :                                     get_rel_name(relid),
    1398                 :             :                                     sub->name));
    1399                 :             :         }
    1400                 :             :     }
    1401                 :           0 :     PG_FINALLY();
    1402                 :             :     {
    1403                 :           3 :         walrcv_disconnect(wrconn);
    1404                 :             :     }
    1405         [ -  + ]:           3 :     PG_END_TRY();
    1406                 :           3 : }
    1407                 :             : 
    1408                 :             : /*
    1409                 :             :  * Common checks for altering failover, two_phase, and retain_dead_tuples
    1410                 :             :  * options.
    1411                 :             :  */
    1412                 :             : static void
    1413                 :          14 : CheckAlterSubOption(Subscription *sub, const char *option,
    1414                 :             :                     bool slot_needs_update, bool isTopLevel)
    1415                 :             : {
    1416                 :             :     Assert(strcmp(option, "failover") == 0 ||
    1417                 :             :            strcmp(option, "two_phase") == 0 ||
    1418                 :             :            strcmp(option, "retain_dead_tuples") == 0);
    1419                 :             : 
    1420                 :             :     /*
    1421                 :             :      * Altering the retain_dead_tuples option does not update the slot on the
    1422                 :             :      * publisher.
    1423                 :             :      */
    1424                 :             :     Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
    1425                 :             : 
    1426                 :             :     /*
    1427                 :             :      * Do not allow changing the option if the subscription is enabled. This
    1428                 :             :      * is because both failover and two_phase options of the slot on the
    1429                 :             :      * publisher cannot be modified if the slot is currently acquired by the
    1430                 :             :      * existing walsender.
    1431                 :             :      *
    1432                 :             :      * Note that two_phase is enabled (aka changed from 'false' to 'true') on
    1433                 :             :      * the publisher by the existing walsender, so we could have allowed that
    1434                 :             :      * even when the subscription is enabled. But we kept this restriction for
    1435                 :             :      * the sake of consistency and simplicity.
    1436                 :             :      *
    1437                 :             :      * Additionally, do not allow changing the retain_dead_tuples option when
    1438                 :             :      * the subscription is enabled to prevent race conditions arising from the
    1439                 :             :      * new option value being acknowledged asynchronously by the launcher and
    1440                 :             :      * apply workers.
    1441                 :             :      *
    1442                 :             :      * Without the restriction, a race condition may arise when a user
    1443                 :             :      * disables and immediately re-enables the retain_dead_tuples option. In
    1444                 :             :      * this case, the launcher might drop the slot upon noticing the disabled
    1445                 :             :      * action, while the apply worker may keep maintaining
    1446                 :             :      * oldest_nonremovable_xid without noticing the option change. During this
    1447                 :             :      * period, a transaction ID wraparound could falsely make this ID appear
    1448                 :             :      * as if it originates from the future w.r.t the transaction ID stored in
    1449                 :             :      * the slot maintained by launcher.
    1450                 :             :      *
    1451                 :             :      * Similarly, if the user enables retain_dead_tuples concurrently with the
    1452                 :             :      * launcher starting the worker, the apply worker may start calculating
    1453                 :             :      * oldest_nonremovable_xid before the launcher notices the enable action.
    1454                 :             :      * Consequently, the launcher may update slot.xmin to a newer value than
    1455                 :             :      * that maintained by the worker. In subsequent cycles, upon integrating
    1456                 :             :      * the worker's oldest_nonremovable_xid, the launcher might detect a
    1457                 :             :      * retreat in the calculated xmin, necessitating additional handling.
    1458                 :             :      *
    1459                 :             :      * XXX To address the above race conditions, we can define
    1460                 :             :      * oldest_nonremovable_xid as FullTransactionId and adds the check to
    1461                 :             :      * disallow retreating the conflict slot's xmin. For now, we kept the
    1462                 :             :      * implementation simple by disallowing change to the retain_dead_tuples,
    1463                 :             :      * but in the future we can change this after some more analysis.
    1464                 :             :      *
    1465                 :             :      * Note that we could restrict only the enabling of retain_dead_tuples to
    1466                 :             :      * avoid the race conditions described above, but we maintain the
    1467                 :             :      * restriction for both enable and disable operations for the sake of
    1468                 :             :      * consistency.
    1469                 :             :      */
    1470         [ +  + ]:          14 :     if (sub->enabled)
    1471         [ +  - ]:           2 :         ereport(ERROR,
    1472                 :             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1473                 :             :                  errmsg("cannot set option \"%s\" for enabled subscription",
    1474                 :             :                         option)));
    1475                 :             : 
    1476         [ +  + ]:          12 :     if (slot_needs_update)
    1477                 :             :     {
    1478                 :             :         StringInfoData cmd;
    1479                 :             : 
    1480                 :             :         /*
    1481                 :             :          * A valid slot must be associated with the subscription for us to
    1482                 :             :          * modify any of the slot's properties.
    1483                 :             :          */
    1484         [ -  + ]:           9 :         if (!sub->slotname)
    1485         [ #  # ]:           0 :             ereport(ERROR,
    1486                 :             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1487                 :             :                      errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
    1488                 :             :                             option)));
    1489                 :             : 
    1490                 :             :         /* The changed option of the slot can't be rolled back. */
    1491                 :           9 :         initStringInfo(&cmd);
    1492                 :           9 :         appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
    1493                 :             : 
    1494                 :           9 :         PreventInTransactionBlock(isTopLevel, cmd.data);
    1495                 :           5 :         pfree(cmd.data);
    1496                 :             :     }
    1497                 :           8 : }
    1498                 :             : 
    1499                 :             : /*
    1500                 :             :  * alter_sub_conflict_log_dest
    1501                 :             :  *
    1502                 :             :  * When the subscription's 'conflict_log_destination' is changed, update the
    1503                 :             :  * conflict log table if required.
    1504                 :             :  *
    1505                 :             :  * If the new destination no longer requires a conflict log table, the existing
    1506                 :             :  * conflict log table associated with the subscription is removed via internal
    1507                 :             :  * dependency cleanup to prevent orphaned relations.
    1508                 :             :  *
    1509                 :             :  * On success, *conflicttablerelid is set to the OID of the conflict log table
    1510                 :             :  * that was created or validated, or to InvalidOid if no table is required.
    1511                 :             :  *
    1512                 :             :  * Returns true if the subscription's conflict log table reference must be
    1513                 :             :  * updated as a result of the destination change; false otherwise.
    1514                 :             :  */
    1515                 :             : static bool
    1516                 :          12 : alter_sub_conflict_log_dest(Subscription *sub, ConflictLogDest oldlogdest,
    1517                 :             :                             ConflictLogDest newlogdest,
    1518                 :             :                             Oid *conflicttablerelid)
    1519                 :             : {
    1520                 :             :     bool        want_table;
    1521                 :             :     bool        has_oldtable;
    1522                 :          12 :     bool        update_relid = false;
    1523                 :          12 :     Oid         relid = InvalidOid;
    1524                 :             : 
    1525   [ +  +  +  + ]:          12 :     want_table = CONFLICTS_LOGGED_TO_TABLE(newlogdest);
    1526   [ +  +  +  + ]:          12 :     has_oldtable = CONFLICTS_LOGGED_TO_TABLE(oldlogdest);
    1527                 :             : 
    1528         [ +  + ]:          12 :     if (has_oldtable)
    1529                 :             :     {
    1530                 :             :         /* There is a conflict log table already. */
    1531         [ +  + ]:           8 :         if (!want_table)
    1532                 :             :         {
    1533                 :           4 :             drop_sub_conflict_log_table(sub->oid, sub->name,
    1534                 :             :                                         sub->conflictlogrelid);
    1535                 :           4 :             update_relid = true;
    1536                 :             :         }
    1537                 :             :     }
    1538                 :             :     else
    1539                 :             :     {
    1540                 :             :         /* There was no previous conflict log table. */
    1541         [ +  - ]:           4 :         if (want_table)
    1542                 :             :         {
    1543                 :             :             ObjectAddress cltaddr;
    1544                 :             :             ObjectAddress subobj;
    1545                 :             : 
    1546                 :           4 :             relid = create_conflict_log_table(sub->oid, sub->name, sub->owner);
    1547                 :           4 :             update_relid = true;
    1548                 :             : 
    1549                 :             :             /*
    1550                 :             :              * Establish an internal dependency between the conflict log table
    1551                 :             :              * and the subscription.  For details refer comments in
    1552                 :             :              * CreateSubscription function.
    1553                 :             :              */
    1554                 :           4 :             ObjectAddressSet(cltaddr, RelationRelationId, relid);
    1555                 :           4 :             ObjectAddressSet(subobj, SubscriptionRelationId, sub->oid);
    1556                 :           4 :             recordDependencyOn(&cltaddr, &subobj, DEPENDENCY_INTERNAL);
    1557                 :             :         }
    1558                 :             :     }
    1559                 :             : 
    1560                 :          12 :     *conflicttablerelid = relid;
    1561                 :          12 :     return update_relid;
    1562                 :             : }
    1563                 :             : 
    1564                 :             : /*
    1565                 :             :  * Alter the existing subscription.
    1566                 :             :  */
    1567                 :             : ObjectAddress
    1568                 :         408 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
    1569                 :             :                   bool isTopLevel)
    1570                 :             : {
    1571                 :             :     Relation    rel;
    1572                 :             :     ObjectAddress myself;
    1573                 :             :     bool        nulls[Natts_pg_subscription];
    1574                 :             :     bool        replaces[Natts_pg_subscription];
    1575                 :             :     Datum       values[Natts_pg_subscription];
    1576                 :             :     HeapTuple   tup;
    1577                 :             :     Oid         subid;
    1578                 :         408 :     bool        orig_conninfo_needed = true;
    1579                 :         408 :     bool        update_tuple = false;
    1580                 :         408 :     bool        update_failover = false;
    1581                 :         408 :     bool        update_two_phase = false;
    1582                 :         408 :     bool        check_pub_rdt = false;
    1583                 :             :     bool        retain_dead_tuples;
    1584                 :             :     int         max_retention;
    1585                 :             :     bool        retention_active;
    1586                 :         408 :     char       *new_conninfo = NULL;
    1587                 :             :     char       *origin;
    1588                 :             :     Subscription *sub;
    1589                 :             :     Form_pg_subscription form;
    1590                 :             :     uint32      supported_opts;
    1591                 :         408 :     SubOpts     opts = {0};
    1592                 :             : 
    1593                 :         408 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1594                 :             : 
    1595                 :             :     /* Fetch the existing tuple. */
    1596                 :         408 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    1597                 :             :                               CStringGetDatum(stmt->subname));
    1598                 :             : 
    1599         [ +  + ]:         408 :     if (!HeapTupleIsValid(tup))
    1600         [ +  - ]:           4 :         ereport(ERROR,
    1601                 :             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1602                 :             :                  errmsg("subscription \"%s\" does not exist",
    1603                 :             :                         stmt->subname)));
    1604                 :             : 
    1605                 :         404 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1606                 :         404 :     subid = form->oid;
    1607                 :             : 
    1608                 :             :     /* must be owner */
    1609         [ -  + ]:         404 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    1610                 :           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    1611                 :           0 :                        stmt->subname);
    1612                 :             : 
    1613                 :             :     /* parse and check options */
    1614   [ +  +  +  +  :         404 :     switch (stmt->kind)
                +  +  + ]
    1615                 :             :     {
    1616                 :         193 :         case ALTER_SUBSCRIPTION_OPTIONS:
    1617                 :         193 :             supported_opts = (SUBOPT_SLOT_NAME |
    1618                 :             :                               SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
    1619                 :             :                               SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
    1620                 :             :                               SUBOPT_DISABLE_ON_ERR |
    1621                 :             :                               SUBOPT_PASSWORD_REQUIRED |
    1622                 :             :                               SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
    1623                 :             :                               SUBOPT_RETAIN_DEAD_TUPLES |
    1624                 :             :                               SUBOPT_MAX_RETENTION_DURATION |
    1625                 :             :                               SUBOPT_WAL_RECEIVER_TIMEOUT |
    1626                 :             :                               SUBOPT_ORIGIN |
    1627                 :             :                               SUBOPT_CONFLICT_LOG_DEST);
    1628                 :         193 :             break;
    1629                 :             : 
    1630                 :          79 :         case ALTER_SUBSCRIPTION_ENABLED:
    1631                 :          79 :             supported_opts = SUBOPT_ENABLED;
    1632                 :          79 :             break;
    1633                 :             : 
    1634                 :          19 :         case ALTER_SUBSCRIPTION_SET_PUBLICATION:
    1635                 :          19 :             supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
    1636                 :          19 :             break;
    1637                 :             : 
    1638                 :          35 :         case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
    1639                 :             :         case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
    1640                 :          35 :             supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
    1641                 :          35 :             break;
    1642                 :             : 
    1643                 :          37 :         case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
    1644                 :          37 :             supported_opts = SUBOPT_COPY_DATA;
    1645                 :          37 :             break;
    1646                 :             : 
    1647                 :          15 :         case ALTER_SUBSCRIPTION_SKIP:
    1648                 :          15 :             supported_opts = SUBOPT_LSN;
    1649                 :          15 :             break;
    1650                 :             : 
    1651                 :          26 :         default:
    1652                 :          26 :             supported_opts = 0;
    1653                 :          26 :             break;
    1654                 :             :     }
    1655                 :             : 
    1656         [ +  + ]:         404 :     if (supported_opts > 0)
    1657                 :         378 :         parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
    1658                 :             : 
    1659                 :             :     /*
    1660                 :             :      * Ensure that ALTER SUBSCRIPTION commands that could be used to fix a
    1661                 :             :      * broken connection or prepare to drop a broken subscription don't
    1662                 :             :      * attempt to construct the conninfo. Otherwise, we might encounter the
    1663                 :             :      * error the user is trying to fix.
    1664                 :             :      *
    1665                 :             :      * Specifically, ALTER SUBSCRIPTION DISABLE, ALTER SUBSCRIPTION SERVER,
    1666                 :             :      * ALTER SUBSCRIPTION CONNECTION, or ALTER SUBSCRIPTION SET
    1667                 :             :      * (slot_name=NONE).
    1668                 :             :      *
    1669                 :             :      * NB: if the user specifies multiple SET options, then we may still need
    1670                 :             :      * to construct conninfo even if slot_name is set to NONE.
    1671                 :             :      */
    1672         [ +  + ]:         380 :     if (stmt->kind == ALTER_SUBSCRIPTION_ENABLED)
    1673                 :             :     {
    1674   [ +  -  +  + ]:          79 :         if (opts.specified_opts == SUBOPT_ENABLED && !opts.enabled)
    1675                 :          45 :             orig_conninfo_needed = false;
    1676                 :             :     }
    1677         [ +  + ]:         301 :     else if (stmt->kind == ALTER_SUBSCRIPTION_SERVER ||
    1678         [ +  + ]:         300 :              stmt->kind == ALTER_SUBSCRIPTION_CONNECTION)
    1679                 :             :     {
    1680                 :          23 :         orig_conninfo_needed = false;
    1681                 :             :     }
    1682         [ +  + ]:         278 :     else if (stmt->kind == ALTER_SUBSCRIPTION_OPTIONS)
    1683                 :             :     {
    1684                 :             :         /* ... SET (slot_name = NONE) with no other options */
    1685   [ +  +  +  + ]:         173 :         if (opts.specified_opts == SUBOPT_SLOT_NAME && !opts.slot_name)
    1686                 :          68 :             orig_conninfo_needed = false;
    1687                 :             :     }
    1688                 :             : 
    1689                 :             :     /*
    1690                 :             :      * Skip ACL checks on the subscription's foreign server, if any. If
    1691                 :             :      * changing the server (or replacing it with a raw connection), then the
    1692                 :             :      * old one will be removed anyway. If changing something unrelated,
    1693                 :             :      * there's no need to do an additional ACL check here; that will be done
    1694                 :             :      * by the subscription worker.
    1695                 :             :      */
    1696                 :         380 :     sub = GetSubscription(subid, false, orig_conninfo_needed, false);
    1697                 :             : 
    1698                 :         380 :     retain_dead_tuples = sub->retaindeadtuples;
    1699                 :         380 :     origin = sub->origin;
    1700                 :         380 :     max_retention = sub->maxretention;
    1701                 :         380 :     retention_active = sub->retentionactive;
    1702                 :             : 
    1703                 :             :     /*
    1704                 :             :      * Don't allow non-superuser modification of a subscription with
    1705                 :             :      * password_required=false.
    1706                 :             :      */
    1707   [ +  +  -  + ]:         380 :     if (!sub->passwordrequired && !superuser())
    1708         [ #  # ]:           0 :         ereport(ERROR,
    1709                 :             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1710                 :             :                  errmsg("password_required=false is superuser-only"),
    1711                 :             :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1712                 :             : 
    1713                 :             :     /* Lock the subscription so nobody else can do anything with it. */
    1714                 :         380 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    1715                 :             : 
    1716                 :             :     /* Form a new tuple. */
    1717                 :         380 :     memset(values, 0, sizeof(values));
    1718                 :         380 :     memset(nulls, false, sizeof(nulls));
    1719                 :         380 :     memset(replaces, false, sizeof(replaces));
    1720                 :             : 
    1721                 :         380 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    1722                 :             : 
    1723   [ +  +  +  +  :         380 :     switch (stmt->kind)
          +  +  +  +  +  
                      - ]
    1724                 :             :     {
    1725                 :         173 :         case ALTER_SUBSCRIPTION_OPTIONS:
    1726                 :             :             {
    1727         [ +  + ]:         173 :                 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1728                 :             :                 {
    1729                 :             :                     /*
    1730                 :             :                      * The subscription must be disabled to allow slot_name as
    1731                 :             :                      * 'none', otherwise, the apply worker will repeatedly try
    1732                 :             :                      * to stream the data using that slot_name which neither
    1733                 :             :                      * exists on the publisher nor the user will be allowed to
    1734                 :             :                      * create it.
    1735                 :             :                      */
    1736   [ -  +  -  - ]:          72 :                     if (sub->enabled && !opts.slot_name)
    1737         [ #  # ]:           0 :                         ereport(ERROR,
    1738                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1739                 :             :                                  errmsg("cannot set %s for enabled subscription",
    1740                 :             :                                         "slot_name = NONE")));
    1741                 :             : 
    1742         [ +  + ]:          72 :                     if (opts.slot_name)
    1743                 :           4 :                         values[Anum_pg_subscription_subslotname - 1] =
    1744                 :           4 :                             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
    1745                 :             :                     else
    1746                 :          68 :                         nulls[Anum_pg_subscription_subslotname - 1] = true;
    1747                 :          72 :                     replaces[Anum_pg_subscription_subslotname - 1] = true;
    1748                 :             :                 }
    1749                 :             : 
    1750         [ +  + ]:         173 :                 if (opts.synchronous_commit)
    1751                 :             :                 {
    1752                 :           4 :                     values[Anum_pg_subscription_subsynccommit - 1] =
    1753                 :           4 :                         CStringGetTextDatum(opts.synchronous_commit);
    1754                 :           4 :                     replaces[Anum_pg_subscription_subsynccommit - 1] = true;
    1755                 :             :                 }
    1756                 :             : 
    1757         [ +  + ]:         173 :                 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
    1758                 :             :                 {
    1759                 :          10 :                     values[Anum_pg_subscription_subbinary - 1] =
    1760                 :          10 :                         BoolGetDatum(opts.binary);
    1761                 :          10 :                     replaces[Anum_pg_subscription_subbinary - 1] = true;
    1762                 :             :                 }
    1763                 :             : 
    1764         [ +  + ]:         173 :                 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
    1765                 :             :                 {
    1766                 :          18 :                     values[Anum_pg_subscription_substream - 1] =
    1767                 :          18 :                         CharGetDatum(opts.streaming);
    1768                 :          18 :                     replaces[Anum_pg_subscription_substream - 1] = true;
    1769                 :             :                 }
    1770                 :             : 
    1771         [ +  + ]:         173 :                 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
    1772                 :             :                 {
    1773                 :             :                     values[Anum_pg_subscription_subdisableonerr - 1]
    1774                 :           4 :                         = BoolGetDatum(opts.disableonerr);
    1775                 :             :                     replaces[Anum_pg_subscription_subdisableonerr - 1]
    1776                 :           4 :                         = true;
    1777                 :             :                 }
    1778                 :             : 
    1779         [ +  + ]:         173 :                 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
    1780                 :             :                 {
    1781                 :             :                     /* Non-superuser may not disable password_required. */
    1782   [ +  +  -  + ]:           8 :                     if (!opts.passwordrequired && !superuser())
    1783         [ #  # ]:           0 :                         ereport(ERROR,
    1784                 :             :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1785                 :             :                                  errmsg("password_required=false is superuser-only"),
    1786                 :             :                                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1787                 :             : 
    1788                 :             :                     values[Anum_pg_subscription_subpasswordrequired - 1]
    1789                 :           8 :                         = BoolGetDatum(opts.passwordrequired);
    1790                 :             :                     replaces[Anum_pg_subscription_subpasswordrequired - 1]
    1791                 :           8 :                         = true;
    1792                 :             :                 }
    1793                 :             : 
    1794         [ +  + ]:         173 :                 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
    1795                 :             :                 {
    1796                 :           9 :                     values[Anum_pg_subscription_subrunasowner - 1] =
    1797                 :           9 :                         BoolGetDatum(opts.runasowner);
    1798                 :           9 :                     replaces[Anum_pg_subscription_subrunasowner - 1] = true;
    1799                 :             :                 }
    1800                 :             : 
    1801         [ +  + ]:         173 :                 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
    1802                 :             :                 {
    1803                 :             :                     /*
    1804                 :             :                      * We need to update both the slot and the subscription
    1805                 :             :                      * for the two_phase option. We can enable the two_phase
    1806                 :             :                      * option for a slot only once the initial data
    1807                 :             :                      * synchronization is done. This is to avoid missing some
    1808                 :             :                      * data as explained in comments atop worker.c.
    1809                 :             :                      */
    1810                 :           3 :                     update_two_phase = !opts.twophase;
    1811                 :             : 
    1812                 :           3 :                     CheckAlterSubOption(sub, "two_phase", update_two_phase,
    1813                 :             :                                         isTopLevel);
    1814                 :             : 
    1815                 :             :                     /*
    1816                 :             :                      * Modifying the two_phase slot option requires a slot
    1817                 :             :                      * lookup by slot name, so changing the slot name at the
    1818                 :             :                      * same time is not allowed.
    1819                 :             :                      */
    1820         [ +  + ]:           3 :                     if (update_two_phase &&
    1821         [ -  + ]:           1 :                         IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1822         [ #  # ]:           0 :                         ereport(ERROR,
    1823                 :             :                                 (errcode(ERRCODE_SYNTAX_ERROR),
    1824                 :             :                                  errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
    1825                 :             : 
    1826                 :             :                     /*
    1827                 :             :                      * Note that workers may still survive even if the
    1828                 :             :                      * subscription has been disabled.
    1829                 :             :                      *
    1830                 :             :                      * Ensure workers have already been exited to avoid
    1831                 :             :                      * getting prepared transactions while we are disabling
    1832                 :             :                      * the two_phase option. Otherwise, the changes of an
    1833                 :             :                      * already prepared transaction can be replicated again
    1834                 :             :                      * along with its corresponding commit, leading to
    1835                 :             :                      * duplicate data or errors.
    1836                 :             :                      */
    1837         [ -  + ]:           3 :                     if (logicalrep_workers_find(subid, true, true))
    1838         [ #  # ]:           0 :                         ereport(ERROR,
    1839                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1840                 :             :                                  errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
    1841                 :             :                                  errhint("Try again after some time.")));
    1842                 :             : 
    1843                 :             :                     /*
    1844                 :             :                      * two_phase cannot be disabled if there are any
    1845                 :             :                      * uncommitted prepared transactions present otherwise it
    1846                 :             :                      * can lead to duplicate data or errors as explained in
    1847                 :             :                      * the comment above.
    1848                 :             :                      */
    1849         [ +  + ]:           3 :                     if (update_two_phase &&
    1850         [ +  - ]:           1 :                         sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
    1851         [ -  + ]:           1 :                         LookupGXactBySubid(subid))
    1852         [ #  # ]:           0 :                         ereport(ERROR,
    1853                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1854                 :             :                                  errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
    1855                 :             :                                  errhint("Resolve these transactions and try again.")));
    1856                 :             : 
    1857                 :             :                     /* Change system catalog accordingly */
    1858                 :           3 :                     values[Anum_pg_subscription_subtwophasestate - 1] =
    1859         [ +  + ]:           3 :                         CharGetDatum(opts.twophase ?
    1860                 :             :                                      LOGICALREP_TWOPHASE_STATE_PENDING :
    1861                 :             :                                      LOGICALREP_TWOPHASE_STATE_DISABLED);
    1862                 :           3 :                     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1863                 :             :                 }
    1864                 :             : 
    1865         [ +  + ]:         173 :                 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
    1866                 :             :                 {
    1867                 :             :                     /*
    1868                 :             :                      * Similar to the two_phase case above, we need to update
    1869                 :             :                      * the failover option for both the slot and the
    1870                 :             :                      * subscription.
    1871                 :             :                      */
    1872                 :           9 :                     update_failover = true;
    1873                 :             : 
    1874                 :           9 :                     CheckAlterSubOption(sub, "failover", update_failover,
    1875                 :             :                                         isTopLevel);
    1876                 :             : 
    1877                 :           4 :                     values[Anum_pg_subscription_subfailover - 1] =
    1878                 :           4 :                         BoolGetDatum(opts.failover);
    1879                 :           4 :                     replaces[Anum_pg_subscription_subfailover - 1] = true;
    1880                 :             :                 }
    1881                 :             : 
    1882         [ +  + ]:         168 :                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
    1883                 :             :                 {
    1884                 :           2 :                     values[Anum_pg_subscription_subretaindeadtuples - 1] =
    1885                 :           2 :                         BoolGetDatum(opts.retaindeadtuples);
    1886                 :           2 :                     replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
    1887                 :             : 
    1888                 :             :                     /*
    1889                 :             :                      * Update the retention status only if there's a change in
    1890                 :             :                      * the retain_dead_tuples option value.
    1891                 :             :                      *
    1892                 :             :                      * Automatically marking retention as active when
    1893                 :             :                      * retain_dead_tuples is enabled may not always be ideal,
    1894                 :             :                      * especially if retention was previously stopped and the
    1895                 :             :                      * user toggles retain_dead_tuples without adjusting the
    1896                 :             :                      * publisher workload. However, this behavior provides a
    1897                 :             :                      * convenient way for users to manually refresh the
    1898                 :             :                      * retention status. Since retention will be stopped again
    1899                 :             :                      * unless the publisher workload is reduced, this approach
    1900                 :             :                      * is acceptable for now.
    1901                 :             :                      */
    1902         [ +  - ]:           2 :                     if (opts.retaindeadtuples != sub->retaindeadtuples)
    1903                 :             :                     {
    1904                 :           2 :                         values[Anum_pg_subscription_subretentionactive - 1] =
    1905                 :           2 :                             BoolGetDatum(opts.retaindeadtuples);
    1906                 :           2 :                         replaces[Anum_pg_subscription_subretentionactive - 1] = true;
    1907                 :             : 
    1908                 :           2 :                         retention_active = opts.retaindeadtuples;
    1909                 :             :                     }
    1910                 :             : 
    1911                 :           2 :                     CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
    1912                 :             : 
    1913                 :             :                     /*
    1914                 :             :                      * Workers may continue running even after the
    1915                 :             :                      * subscription has been disabled.
    1916                 :             :                      *
    1917                 :             :                      * To prevent race conditions (as described in
    1918                 :             :                      * CheckAlterSubOption()), ensure that all worker
    1919                 :             :                      * processes have already exited before proceeding.
    1920                 :             :                      */
    1921         [ -  + ]:           1 :                     if (logicalrep_workers_find(subid, true, true))
    1922         [ #  # ]:           0 :                         ereport(ERROR,
    1923                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1924                 :             :                                  errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
    1925                 :             :                                  errhint("Try again after some time.")));
    1926                 :             : 
    1927                 :             :                     /*
    1928                 :             :                      * Notify the launcher to manage the replication slot for
    1929                 :             :                      * conflict detection. This ensures that replication slot
    1930                 :             :                      * is efficiently handled (created, updated, or dropped)
    1931                 :             :                      * in response to any configuration changes.
    1932                 :             :                      */
    1933                 :           1 :                     ApplyLauncherWakeupAtCommit();
    1934                 :             : 
    1935                 :           1 :                     check_pub_rdt = opts.retaindeadtuples;
    1936                 :           1 :                     retain_dead_tuples = opts.retaindeadtuples;
    1937                 :             :                 }
    1938                 :             : 
    1939         [ +  + ]:         167 :                 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1940                 :             :                 {
    1941                 :           6 :                     values[Anum_pg_subscription_submaxretention - 1] =
    1942                 :           6 :                         Int32GetDatum(opts.maxretention);
    1943                 :           6 :                     replaces[Anum_pg_subscription_submaxretention - 1] = true;
    1944                 :             : 
    1945                 :           6 :                     max_retention = opts.maxretention;
    1946                 :             :                 }
    1947                 :             : 
    1948                 :             :                 /*
    1949                 :             :                  * Ensure that system configuration parameters are set
    1950                 :             :                  * appropriately to support retain_dead_tuples and
    1951                 :             :                  * max_retention_duration.
    1952                 :             :                  */
    1953         [ +  + ]:         167 :                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
    1954         [ +  + ]:         166 :                     IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1955                 :           7 :                     CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
    1956                 :             :                                                retain_dead_tuples,
    1957                 :             :                                                retention_active,
    1958                 :           7 :                                                (max_retention > 0));
    1959                 :             : 
    1960         [ +  + ]:         167 :                 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
    1961                 :             :                 {
    1962                 :           6 :                     values[Anum_pg_subscription_suborigin - 1] =
    1963                 :           6 :                         CStringGetTextDatum(opts.origin);
    1964                 :           6 :                     replaces[Anum_pg_subscription_suborigin - 1] = true;
    1965                 :             : 
    1966                 :             :                     /*
    1967                 :             :                      * Check if changes from different origins may be received
    1968                 :             :                      * from the publisher when the origin is changed to ANY
    1969                 :             :                      * and retain_dead_tuples is enabled. Use |= so that we
    1970                 :             :                      * don't clear the flag already set when
    1971                 :             :                      * retain_dead_tuples was changed in the same command.
    1972                 :             :                      */
    1973         [ +  + ]:           8 :                     check_pub_rdt |= retain_dead_tuples &&
    1974         [ +  + ]:           2 :                         pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
    1975                 :             : 
    1976                 :           6 :                     origin = opts.origin;
    1977                 :             :                 }
    1978                 :             : 
    1979         [ +  + ]:         167 :                 if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
    1980                 :             :                 {
    1981                 :           8 :                     values[Anum_pg_subscription_subwalrcvtimeout - 1] =
    1982                 :           8 :                         CStringGetTextDatum(opts.wal_receiver_timeout);
    1983                 :           8 :                     replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true;
    1984                 :             :                 }
    1985                 :             : 
    1986         [ +  + ]:         167 :                 if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DEST))
    1987                 :             :                 {
    1988                 :             :                     ConflictLogDest old_dest =
    1989                 :          16 :                         GetConflictLogDest(sub->conflictlogdest);
    1990                 :             : 
    1991         [ +  + ]:          16 :                     if (opts.conflictlogdest != old_dest)
    1992                 :             :                     {
    1993                 :             :                         bool        update_relid;
    1994                 :          12 :                         Oid         relid = InvalidOid;
    1995                 :             : 
    1996                 :          12 :                         values[Anum_pg_subscription_subconflictlogdest - 1] =
    1997                 :          12 :                             CStringGetTextDatum(ConflictLogDestNames[opts.conflictlogdest]);
    1998                 :          12 :                         replaces[Anum_pg_subscription_subconflictlogdest - 1] = true;
    1999                 :             : 
    2000                 :          12 :                         update_relid = alter_sub_conflict_log_dest(sub,
    2001                 :             :                                                                    old_dest,
    2002                 :             :                                                                    opts.conflictlogdest,
    2003                 :             :                                                                    &relid);
    2004         [ +  + ]:          12 :                         if (update_relid)
    2005                 :             :                         {
    2006                 :           8 :                             values[Anum_pg_subscription_subconflictlogrelid - 1] =
    2007                 :           8 :                                 ObjectIdGetDatum(relid);
    2008                 :           8 :                             replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
    2009                 :             :                                 true;
    2010                 :             :                         }
    2011                 :             :                     }
    2012                 :             :                 }
    2013                 :             : 
    2014                 :         167 :                 update_tuple = true;
    2015                 :         167 :                 break;
    2016                 :             :             }
    2017                 :             : 
    2018                 :          79 :         case ALTER_SUBSCRIPTION_ENABLED:
    2019                 :             :             {
    2020                 :             :                 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
    2021                 :             : 
    2022   [ +  +  +  - ]:          79 :                 if (!sub->slotname && opts.enabled)
    2023         [ +  - ]:           4 :                     ereport(ERROR,
    2024                 :             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2025                 :             :                              errmsg("cannot enable subscription that does not have a slot name")));
    2026                 :             : 
    2027                 :             :                 /*
    2028                 :             :                  * Check track_commit_timestamp only when enabling the
    2029                 :             :                  * subscription in case it was disabled after creation. See
    2030                 :             :                  * comments atop CheckSubDeadTupleRetention() for details.
    2031                 :             :                  */
    2032                 :          75 :                 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
    2033                 :          75 :                                            WARNING, sub->retaindeadtuples,
    2034                 :          75 :                                            sub->retentionactive, false);
    2035                 :             : 
    2036                 :          75 :                 values[Anum_pg_subscription_subenabled - 1] =
    2037                 :          75 :                     BoolGetDatum(opts.enabled);
    2038                 :          75 :                 replaces[Anum_pg_subscription_subenabled - 1] = true;
    2039                 :             : 
    2040         [ +  + ]:          75 :                 if (opts.enabled)
    2041                 :          30 :                     ApplyLauncherWakeupAtCommit();
    2042                 :             : 
    2043                 :          75 :                 update_tuple = true;
    2044                 :             : 
    2045                 :             :                 /*
    2046                 :             :                  * The subscription might be initially created with
    2047                 :             :                  * connect=false and retain_dead_tuples=true, meaning the
    2048                 :             :                  * remote server's status may not be checked. Ensure this
    2049                 :             :                  * check is conducted now.
    2050                 :             :                  */
    2051   [ +  +  +  + ]:          75 :                 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
    2052                 :          75 :                 break;
    2053                 :             :             }
    2054                 :             : 
    2055                 :           1 :         case ALTER_SUBSCRIPTION_SERVER:
    2056                 :             :             {
    2057                 :             :                 ForeignServer *new_server;
    2058                 :             :                 ObjectAddress referenced;
    2059                 :             :                 AclResult   aclresult;
    2060                 :             : 
    2061                 :             :                 /*
    2062                 :             :                  * Remove what was there before, either another foreign server
    2063                 :             :                  * or a connection string.
    2064                 :             :                  */
    2065         [ -  + ]:           1 :                 if (form->subserver)
    2066                 :             :                 {
    2067                 :           0 :                     deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
    2068                 :             :                                                        DEPENDENCY_NORMAL,
    2069                 :             :                                                        ForeignServerRelationId, form->subserver);
    2070                 :             :                 }
    2071                 :             :                 else
    2072                 :             :                 {
    2073                 :           1 :                     nulls[Anum_pg_subscription_subconninfo - 1] = true;
    2074                 :           1 :                     replaces[Anum_pg_subscription_subconninfo - 1] = true;
    2075                 :             :                 }
    2076                 :             : 
    2077                 :             :                 /*
    2078                 :             :                  * Check that the subscription owner has USAGE privileges on
    2079                 :             :                  * the server.
    2080                 :             :                  */
    2081                 :           1 :                 new_server = GetForeignServerByName(stmt->servername, false);
    2082                 :           1 :                 aclresult = object_aclcheck(ForeignServerRelationId,
    2083                 :             :                                             new_server->serverid,
    2084                 :             :                                             form->subowner, ACL_USAGE);
    2085         [ -  + ]:           1 :                 if (aclresult != ACLCHECK_OK)
    2086         [ #  # ]:           0 :                     ereport(ERROR,
    2087                 :             :                             errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2088                 :             :                             errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
    2089                 :             :                                    GetUserNameFromId(form->subowner, false),
    2090                 :             :                                    new_server->servername));
    2091                 :             : 
    2092                 :             :                 /* make sure a user mapping exists */
    2093                 :           1 :                 GetUserMapping(form->subowner, new_server->serverid);
    2094                 :             : 
    2095                 :           1 :                 new_conninfo = ForeignServerConnectionString(form->subowner,
    2096                 :             :                                                              new_server);
    2097                 :             : 
    2098                 :             :                 /* Load the library providing us libpq calls. */
    2099                 :           1 :                 load_file("libpqwalreceiver", false);
    2100                 :             :                 /* Check the connection info string. */
    2101   [ -  +  -  - ]:           1 :                 walrcv_check_conninfo(new_conninfo,
    2102                 :             :                                       sub->passwordrequired && !sub->ownersuperuser);
    2103                 :             : 
    2104                 :           1 :                 values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(new_server->serverid);
    2105                 :           1 :                 replaces[Anum_pg_subscription_subserver - 1] = true;
    2106                 :             : 
    2107                 :           1 :                 ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
    2108                 :           1 :                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
    2109                 :             : 
    2110                 :           1 :                 update_tuple = true;
    2111                 :             :             }
    2112                 :             : 
    2113                 :             :             /*
    2114                 :             :              * Since the remote server configuration might have changed,
    2115                 :             :              * perform a check to ensure it permits enabling
    2116                 :             :              * retain_dead_tuples.
    2117                 :             :              */
    2118                 :           1 :             check_pub_rdt = sub->retaindeadtuples;
    2119                 :           1 :             break;
    2120                 :             : 
    2121                 :          22 :         case ALTER_SUBSCRIPTION_CONNECTION:
    2122                 :             :             /* remove reference to foreign server and dependencies, if present */
    2123         [ +  + ]:          22 :             if (form->subserver)
    2124                 :             :             {
    2125                 :           9 :                 deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
    2126                 :             :                                                    DEPENDENCY_NORMAL,
    2127                 :             :                                                    ForeignServerRelationId, form->subserver);
    2128                 :             : 
    2129                 :           9 :                 values[Anum_pg_subscription_subserver - 1] = ObjectIdGetDatum(InvalidOid);
    2130                 :           9 :                 replaces[Anum_pg_subscription_subserver - 1] = true;
    2131                 :             :             }
    2132                 :             : 
    2133                 :          22 :             new_conninfo = stmt->conninfo;
    2134                 :             : 
    2135                 :             :             /* Load the library providing us libpq calls. */
    2136                 :          22 :             load_file("libpqwalreceiver", false);
    2137                 :             :             /* Check the connection info string. */
    2138   [ +  +  +  + ]:          22 :             walrcv_check_conninfo(new_conninfo,
    2139                 :             :                                   sub->passwordrequired && !sub->ownersuperuser);
    2140                 :             : 
    2141                 :          18 :             values[Anum_pg_subscription_subconninfo - 1] =
    2142                 :          18 :                 CStringGetTextDatum(stmt->conninfo);
    2143                 :          18 :             replaces[Anum_pg_subscription_subconninfo - 1] = true;
    2144                 :          18 :             update_tuple = true;
    2145                 :             : 
    2146                 :             :             /*
    2147                 :             :              * Since the remote server configuration might have changed,
    2148                 :             :              * perform a check to ensure it permits enabling
    2149                 :             :              * retain_dead_tuples.
    2150                 :             :              */
    2151                 :          18 :             check_pub_rdt = sub->retaindeadtuples;
    2152                 :          18 :             break;
    2153                 :             : 
    2154                 :          19 :         case ALTER_SUBSCRIPTION_SET_PUBLICATION:
    2155                 :             :             {
    2156                 :          19 :                 values[Anum_pg_subscription_subpublications - 1] =
    2157                 :          19 :                     publicationListToArray(stmt->publication);
    2158                 :          19 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    2159                 :             : 
    2160                 :          19 :                 update_tuple = true;
    2161                 :             : 
    2162                 :             :                 /* Refresh if user asked us to. */
    2163         [ +  + ]:          19 :                 if (opts.refresh)
    2164                 :             :                 {
    2165         [ -  + ]:          15 :                     if (!sub->enabled)
    2166         [ #  # ]:           0 :                         ereport(ERROR,
    2167                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2168                 :             :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    2169                 :             :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
    2170                 :             : 
    2171                 :             :                     /*
    2172                 :             :                      * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    2173                 :             :                      * why this is not allowed.
    2174                 :             :                      */
    2175   [ -  +  -  - ]:          15 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    2176         [ #  # ]:           0 :                         ereport(ERROR,
    2177                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2178                 :             :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    2179                 :             :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    2180                 :             : 
    2181                 :          15 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    2182                 :             : 
    2183                 :             :                     /* Make sure refresh sees the new list of publications. */
    2184                 :           7 :                     sub->publications = stmt->publication;
    2185                 :             : 
    2186                 :           7 :                     AlterSubscription_refresh(sub, opts.copy_data,
    2187                 :             :                                               stmt->publication);
    2188                 :             :                 }
    2189                 :             : 
    2190                 :          11 :                 break;
    2191                 :             :             }
    2192                 :             : 
    2193                 :          35 :         case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
    2194                 :             :         case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
    2195                 :             :             {
    2196                 :             :                 List       *publist;
    2197                 :          35 :                 bool        isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
    2198                 :             : 
    2199                 :          35 :                 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
    2200                 :          11 :                 values[Anum_pg_subscription_subpublications - 1] =
    2201                 :          11 :                     publicationListToArray(publist);
    2202                 :          11 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    2203                 :             : 
    2204                 :          11 :                 update_tuple = true;
    2205                 :             : 
    2206                 :             :                 /* Refresh if user asked us to. */
    2207         [ +  + ]:          11 :                 if (opts.refresh)
    2208                 :             :                 {
    2209                 :             :                     /* We only need to validate user specified publications. */
    2210         [ +  + ]:           3 :                     List       *validate_publications = (isadd) ? stmt->publication : NULL;
    2211                 :             : 
    2212         [ -  + ]:           3 :                     if (!sub->enabled)
    2213   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    2214                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2215                 :             :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    2216                 :             :                         /* translator: %s is an SQL ALTER command */
    2217                 :             :                                  errhint("Use %s instead.",
    2218                 :             :                                          isadd ?
    2219                 :             :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
    2220                 :             :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
    2221                 :             : 
    2222                 :             :                     /*
    2223                 :             :                      * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    2224                 :             :                      * why this is not allowed.
    2225                 :             :                      */
    2226   [ -  +  -  - ]:           3 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    2227   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    2228                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2229                 :             :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    2230                 :             :                         /* translator: %s is an SQL ALTER command */
    2231                 :             :                                  errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
    2232                 :             :                                          isadd ?
    2233                 :             :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
    2234                 :             :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
    2235                 :             : 
    2236                 :           3 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    2237                 :             : 
    2238                 :             :                     /* Refresh the new list of publications. */
    2239                 :           3 :                     sub->publications = publist;
    2240                 :             : 
    2241                 :           3 :                     AlterSubscription_refresh(sub, opts.copy_data,
    2242                 :             :                                               validate_publications);
    2243                 :             :                 }
    2244                 :             : 
    2245                 :          11 :                 break;
    2246                 :             :             }
    2247                 :             : 
    2248                 :          37 :         case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
    2249                 :             :             {
    2250         [ +  + ]:          37 :                 if (!sub->enabled)
    2251         [ +  - ]:           4 :                     ereport(ERROR,
    2252                 :             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2253                 :             :                              errmsg("%s is not allowed for disabled subscriptions",
    2254                 :             :                                     "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
    2255                 :             : 
    2256                 :             :                 /*
    2257                 :             :                  * The subscription option "two_phase" requires that
    2258                 :             :                  * replication has passed the initial table synchronization
    2259                 :             :                  * phase before the two_phase becomes properly enabled.
    2260                 :             :                  *
    2261                 :             :                  * But, having reached this two-phase commit "enabled" state
    2262                 :             :                  * we must not allow any subsequent table initialization to
    2263                 :             :                  * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
    2264                 :             :                  * disallowed when the user had requested two_phase = on mode.
    2265                 :             :                  *
    2266                 :             :                  * The exception to this restriction is when copy_data =
    2267                 :             :                  * false, because when copy_data is false the tablesync will
    2268                 :             :                  * start already in READY state and will exit directly without
    2269                 :             :                  * doing anything.
    2270                 :             :                  *
    2271                 :             :                  * For more details see comments atop worker.c.
    2272                 :             :                  */
    2273   [ -  +  -  - ]:          33 :                 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    2274         [ #  # ]:           0 :                     ereport(ERROR,
    2275                 :             :                             (errcode(ERRCODE_SYNTAX_ERROR),
    2276                 :             :                              errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
    2277                 :             :                              errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    2278                 :             : 
    2279                 :          33 :                 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
    2280                 :             : 
    2281                 :          29 :                 AlterSubscription_refresh(sub, opts.copy_data, NULL);
    2282                 :             : 
    2283                 :          28 :                 break;
    2284                 :             :             }
    2285                 :             : 
    2286                 :           3 :         case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
    2287                 :             :             {
    2288         [ -  + ]:           3 :                 if (!sub->enabled)
    2289         [ #  # ]:           0 :                     ereport(ERROR,
    2290                 :             :                             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2291                 :             :                             errmsg("%s is not allowed for disabled subscriptions",
    2292                 :             :                                    "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
    2293                 :             : 
    2294                 :           3 :                 AlterSubscription_refresh_seq(sub);
    2295                 :             : 
    2296                 :           3 :                 break;
    2297                 :             :             }
    2298                 :             : 
    2299                 :          11 :         case ALTER_SUBSCRIPTION_SKIP:
    2300                 :             :             {
    2301                 :             :                 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
    2302                 :             :                 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
    2303                 :             : 
    2304                 :             :                 /*
    2305                 :             :                  * If the user sets subskiplsn, we do a sanity check to make
    2306                 :             :                  * sure that the specified LSN is a probable value.
    2307                 :             :                  */
    2308         [ +  + ]:          11 :                 if (XLogRecPtrIsValid(opts.lsn))
    2309                 :             :                 {
    2310                 :             :                     ReplOriginId originid;
    2311                 :             :                     char        originname[NAMEDATALEN];
    2312                 :             :                     XLogRecPtr  remote_lsn;
    2313                 :             : 
    2314                 :           7 :                     ReplicationOriginNameForLogicalRep(subid, InvalidOid,
    2315                 :             :                                                        originname, sizeof(originname));
    2316                 :           7 :                     originid = replorigin_by_name(originname, false);
    2317                 :           7 :                     remote_lsn = replorigin_get_progress(originid, false);
    2318                 :             : 
    2319                 :             :                     /* Check the given LSN is at least a future LSN */
    2320   [ +  +  -  + ]:           7 :                     if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
    2321         [ #  # ]:           0 :                         ereport(ERROR,
    2322                 :             :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2323                 :             :                                  errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
    2324                 :             :                                         LSN_FORMAT_ARGS(opts.lsn),
    2325                 :             :                                         LSN_FORMAT_ARGS(remote_lsn))));
    2326                 :             :                 }
    2327                 :             : 
    2328                 :          11 :                 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
    2329                 :          11 :                 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
    2330                 :             : 
    2331                 :          11 :                 update_tuple = true;
    2332                 :          11 :                 break;
    2333                 :             :             }
    2334                 :             : 
    2335                 :           0 :         default:
    2336         [ #  # ]:           0 :             elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
    2337                 :             :                  stmt->kind);
    2338                 :             :     }
    2339                 :             : 
    2340                 :             :     /* Update the catalog if needed. */
    2341         [ +  + ]:         325 :     if (update_tuple)
    2342                 :             :     {
    2343                 :         294 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    2344                 :             :                                 replaces);
    2345                 :             : 
    2346                 :         294 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    2347                 :             : 
    2348                 :         294 :         heap_freetuple(tup);
    2349                 :             :     }
    2350                 :             : 
    2351                 :             :     /*
    2352                 :             :      * Try to acquire the connection necessary either for modifying the slot
    2353                 :             :      * or for checking if the remote server permits enabling
    2354                 :             :      * retain_dead_tuples.
    2355                 :             :      *
    2356                 :             :      * This has to be at the end because otherwise if there is an error while
    2357                 :             :      * doing the database operations we won't be able to rollback altered
    2358                 :             :      * slot.
    2359                 :             :      */
    2360   [ +  +  +  +  :         325 :     if (update_failover || update_two_phase || check_pub_rdt)
                   +  + ]
    2361                 :             :     {
    2362                 :             :         bool        must_use_password;
    2363                 :             :         char       *err;
    2364                 :             :         WalReceiverConn *wrconn;
    2365                 :             : 
    2366                 :             :         Assert(new_conninfo || orig_conninfo_needed);
    2367                 :             : 
    2368                 :             :         /* Load the library providing us libpq calls. */
    2369                 :          13 :         load_file("libpqwalreceiver", false);
    2370                 :             : 
    2371                 :             :         /*
    2372                 :             :          * Try to connect to the publisher, using the new connection string if
    2373                 :             :          * available.
    2374                 :             :          */
    2375   [ +  -  -  + ]:          13 :         must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    2376         [ +  - ]:          13 :         wrconn = walrcv_connect(new_conninfo ? new_conninfo : sub->conninfo,
    2377                 :             :                                 true, true, must_use_password, sub->name,
    2378                 :             :                                 &err);
    2379         [ -  + ]:          13 :         if (!wrconn)
    2380         [ #  # ]:           0 :             ereport(ERROR,
    2381                 :             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    2382                 :             :                      errmsg("subscription \"%s\" could not connect to the publisher: %s",
    2383                 :             :                             sub->name, err)));
    2384                 :             : 
    2385         [ +  - ]:          13 :         PG_TRY();
    2386                 :             :         {
    2387         [ +  + ]:          13 :             if (retain_dead_tuples)
    2388                 :           9 :                 check_pub_dead_tuple_retention(wrconn);
    2389                 :             : 
    2390                 :          13 :             check_publications_origin_tables(wrconn, sub->publications, false,
    2391                 :             :                                              retain_dead_tuples, origin, NULL, 0,
    2392                 :             :                                              sub->name);
    2393                 :             : 
    2394   [ +  +  +  + ]:          13 :             if (update_failover || update_two_phase)
    2395   [ +  +  +  + ]:           5 :                 walrcv_alter_slot(wrconn, sub->slotname,
    2396                 :             :                                   update_failover ? &opts.failover : NULL,
    2397                 :             :                                   update_two_phase ? &opts.twophase : NULL);
    2398                 :             :         }
    2399                 :           0 :         PG_FINALLY();
    2400                 :             :         {
    2401                 :          13 :             walrcv_disconnect(wrconn);
    2402                 :             :         }
    2403         [ -  + ]:          13 :         PG_END_TRY();
    2404                 :             :     }
    2405                 :             : 
    2406                 :         325 :     table_close(rel, RowExclusiveLock);
    2407                 :             : 
    2408         [ -  + ]:         325 :     InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
    2409                 :             : 
    2410                 :             :     /* Wake up related replication workers to handle this change quickly. */
    2411                 :         325 :     LogicalRepWorkersWakeupAtCommit(subid);
    2412                 :             : 
    2413                 :         325 :     return myself;
    2414                 :             : }
    2415                 :             : 
    2416                 :             : /*
    2417                 :             :  * Construct conninfo from a subscription's server. Like libpqrcv_connect(),
    2418                 :             :  * if an error occurs, set *err to the error message and return NULL.
    2419                 :             :  *
    2420                 :             :  * However, failures in ForeignServerConnectionString() may ereport(ERROR),
    2421                 :             :  * and (also like libpqrcv_connect) it's not worth adding the machinery to
    2422                 :             :  * pass all of those back to the caller just to cover this one case.
    2423                 :             :  */
    2424                 :             : static char *
    2425                 :           8 : construct_subserver_conninfo(Oid subserver, Oid subowner, char **err)
    2426                 :             : {
    2427                 :             :     AclResult   aclresult;
    2428                 :             :     ForeignServer *server;
    2429                 :             : 
    2430                 :           8 :     *err = NULL;
    2431                 :             : 
    2432                 :           8 :     server = GetForeignServer(subserver);
    2433                 :             : 
    2434                 :           8 :     aclresult = object_aclcheck(ForeignServerRelationId, subserver,
    2435                 :             :                                 subowner, ACL_USAGE);
    2436         [ +  + ]:           8 :     if (aclresult != ACLCHECK_OK)
    2437                 :             :     {
    2438                 :             :         /*
    2439                 :             :          * Unable to generate connection string because permissions on the
    2440                 :             :          * foreign server have been removed. Follow the same logic as an
    2441                 :             :          * unusable subconninfo (which will result in an ERROR later unless
    2442                 :             :          * slot_name = NONE).
    2443                 :             :          */
    2444                 :           4 :         *err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
    2445                 :             :                         GetUserNameFromId(subowner, false),
    2446                 :             :                         server->servername);
    2447                 :           4 :         return NULL;
    2448                 :             :     }
    2449                 :             : 
    2450                 :           4 :     return ForeignServerConnectionString(subowner, server);
    2451                 :             : }
    2452                 :             : 
    2453                 :             : /*
    2454                 :             :  * Drop subscription's conflict log table
    2455                 :             :  *
    2456                 :             :  * The conflict log table is registered as an internal dependency of the
    2457                 :             :  * subscription. This function removes the dependency by performing a
    2458                 :             :  * cascading deletion on the subscription object, which in turn drops the
    2459                 :             :  * associated conflict log table.
    2460                 :             :  *
    2461                 :             :  * This is used to clean up conflict log tables that are no longer required,
    2462                 :             :  * preventing accumulation of stale or orphaned relations.
    2463                 :             :  *
    2464                 :             :  * NOTE:
    2465                 :             :  * Only conflict log tables are currently managed via this internal dependency
    2466                 :             :  * mechanism.
    2467                 :             :  */
    2468                 :             : static void
    2469                 :         172 : drop_sub_conflict_log_table(Oid subid, char *subname, Oid subconflictlogrelid)
    2470                 :             : {
    2471                 :             :     /* Drop any dependent conflict log table */
    2472         [ +  + ]:         172 :     if (OidIsValid(subconflictlogrelid))
    2473                 :             :     {
    2474                 :             :         ObjectAddress object;
    2475                 :             :         char       *conflictrelname;
    2476                 :             : 
    2477                 :          13 :         conflictrelname = get_rel_name(subconflictlogrelid);
    2478         [ -  + ]:          13 :         if (conflictrelname == NULL)
    2479         [ #  # ]:           0 :             elog(ERROR, "cache lookup failed for relation %u",
    2480                 :             :                  subconflictlogrelid);
    2481                 :             : 
    2482                 :             :         /*
    2483                 :             :          * By using PERFORM_DELETION_SKIP_ORIGINAL, we ensure that only the
    2484                 :             :          * conflict log table is deleted while the subscription remains.
    2485                 :             :          */
    2486                 :          13 :         ObjectAddressSet(object, SubscriptionRelationId, subid);
    2487                 :          13 :         performDeletion(&object, DROP_CASCADE,
    2488                 :             :                         PERFORM_DELETION_INTERNAL |
    2489                 :             :                         PERFORM_DELETION_SKIP_ORIGINAL);
    2490                 :             : 
    2491         [ +  + ]:          13 :         ereport(NOTICE,
    2492                 :             :                 errmsg("dropped conflict log table \"%s\" for subscription \"%s\"",
    2493                 :             :                        get_qualified_objname(PG_CONFLICT_NAMESPACE, conflictrelname),
    2494                 :             :                        subname));
    2495                 :             :     }
    2496                 :         172 : }
    2497                 :             : 
    2498                 :             : /*
    2499                 :             :  * Drop a subscription
    2500                 :             :  */
    2501                 :             : void
    2502                 :         180 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    2503                 :             : {
    2504                 :             :     Relation    rel;
    2505                 :             :     ObjectAddress myself;
    2506                 :             :     HeapTuple   tup;
    2507                 :             :     Oid         subid;
    2508                 :             :     Oid         subowner;
    2509                 :             :     Oid         subserver;
    2510                 :             :     Oid         subconflictlogrelid;
    2511                 :         180 :     char       *subconninfo = NULL;
    2512                 :             :     Datum       datum;
    2513                 :             :     bool        isnull;
    2514                 :             :     char       *subname;
    2515                 :         180 :     char       *conninfo = NULL;
    2516                 :             :     char       *slotname;
    2517                 :             :     List       *subworkers;
    2518                 :             :     ListCell   *lc;
    2519                 :             :     char        originname[NAMEDATALEN];
    2520                 :         180 :     char       *err = NULL;
    2521                 :         180 :     WalReceiverConn *wrconn = NULL;
    2522                 :             :     Form_pg_subscription form;
    2523                 :             :     List       *rstates;
    2524                 :             :     bool        must_use_password;
    2525                 :             : 
    2526                 :             :     /*
    2527                 :             :      * The launcher may concurrently start a new worker for this subscription.
    2528                 :             :      * During initialization, the worker checks for subscription validity and
    2529                 :             :      * exits if the subscription has already been dropped. See
    2530                 :             :      * InitializeLogRepWorker.
    2531                 :             :      */
    2532                 :         180 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2533                 :             : 
    2534                 :         180 :     tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2535                 :         180 :                           CStringGetDatum(stmt->subname));
    2536                 :             : 
    2537         [ +  + ]:         180 :     if (!HeapTupleIsValid(tup))
    2538                 :             :     {
    2539                 :           8 :         table_close(rel, NoLock);
    2540                 :             : 
    2541         [ +  + ]:           8 :         if (!stmt->missing_ok)
    2542         [ +  - ]:           4 :             ereport(ERROR,
    2543                 :             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2544                 :             :                      errmsg("subscription \"%s\" does not exist",
    2545                 :             :                             stmt->subname)));
    2546                 :             :         else
    2547         [ +  - ]:           4 :             ereport(NOTICE,
    2548                 :             :                     (errmsg("subscription \"%s\" does not exist, skipping",
    2549                 :             :                             stmt->subname)));
    2550                 :             : 
    2551                 :          85 :         return;
    2552                 :             :     }
    2553                 :             : 
    2554                 :         172 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
    2555                 :             :                             Anum_pg_subscription_subconninfo, &isnull);
    2556         [ +  + ]:         172 :     if (!isnull)
    2557                 :         155 :         subconninfo = TextDatumGetCString(datum);
    2558                 :             : 
    2559                 :         172 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2560                 :         172 :     subid = form->oid;
    2561                 :         172 :     subowner = form->subowner;
    2562                 :         172 :     subserver = form->subserver;
    2563                 :         172 :     subconflictlogrelid = form->subconflictlogrelid;
    2564   [ +  +  +  + ]:         172 :     must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
    2565                 :             : 
    2566                 :             :     /* must be owner */
    2567         [ -  + ]:         172 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    2568                 :           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2569                 :           0 :                        stmt->subname);
    2570                 :             : 
    2571                 :             :     /* DROP hook for the subscription being removed */
    2572         [ -  + ]:         172 :     InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
    2573                 :             : 
    2574                 :             :     /*
    2575                 :             :      * Lock the subscription so nobody else can do anything with it (including
    2576                 :             :      * the replication workers).
    2577                 :             :      */
    2578                 :         172 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    2579                 :             : 
    2580                 :             :     /* Get subname */
    2581                 :         172 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2582                 :             :                                    Anum_pg_subscription_subname);
    2583                 :         172 :     subname = pstrdup(NameStr(*DatumGetName(datum)));
    2584                 :             : 
    2585                 :             :     /* Get slotname */
    2586                 :         172 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
    2587                 :             :                             Anum_pg_subscription_subslotname, &isnull);
    2588         [ +  + ]:         172 :     if (!isnull)
    2589                 :          90 :         slotname = pstrdup(NameStr(*DatumGetName(datum)));
    2590                 :             :     else
    2591                 :          82 :         slotname = NULL;
    2592                 :             : 
    2593                 :             :     /*
    2594                 :             :      * Since dropping a replication slot is not transactional, the replication
    2595                 :             :      * slot stays dropped even if the transaction rolls back.  So we cannot
    2596                 :             :      * run DROP SUBSCRIPTION inside a transaction block if dropping the
    2597                 :             :      * replication slot.  Also, in this case, we report a message for dropping
    2598                 :             :      * the subscription to the cumulative stats system.
    2599                 :             :      *
    2600                 :             :      * XXX The command name should really be something like "DROP SUBSCRIPTION
    2601                 :             :      * of a subscription that is associated with a replication slot", but we
    2602                 :             :      * don't have the proper facilities for that.
    2603                 :             :      */
    2604         [ +  + ]:         172 :     if (slotname)
    2605                 :          90 :         PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
    2606                 :             : 
    2607                 :         168 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    2608                 :         168 :     EventTriggerSQLDropAddObject(&myself, true, true);
    2609                 :             : 
    2610                 :             :     /* Remove the tuple from catalog. */
    2611                 :         168 :     CatalogTupleDelete(rel, &tup->t_self);
    2612                 :             : 
    2613                 :         168 :     ReleaseSysCache(tup);
    2614                 :             : 
    2615                 :             :     /*
    2616                 :             :      * Stop all the subscription workers immediately.
    2617                 :             :      *
    2618                 :             :      * This is necessary if we are dropping the replication slot, so that the
    2619                 :             :      * slot becomes accessible.
    2620                 :             :      *
    2621                 :             :      * It is also necessary if the subscription is disabled and was disabled
    2622                 :             :      * in the same transaction.  Then the workers haven't seen the disabling
    2623                 :             :      * yet and will still be running, leading to hangs later when we want to
    2624                 :             :      * drop the replication origin.  If the subscription was disabled before
    2625                 :             :      * this transaction, then there shouldn't be any workers left, so this
    2626                 :             :      * won't make a difference.
    2627                 :             :      *
    2628                 :             :      * New workers won't be started because we hold an exclusive lock on the
    2629                 :             :      * subscription till the end of the transaction.
    2630                 :             :      */
    2631                 :         168 :     subworkers = logicalrep_workers_find(subid, false, true);
    2632   [ +  +  +  +  :         248 :     foreach(lc, subworkers)
                   +  + ]
    2633                 :             :     {
    2634                 :          80 :         LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
    2635                 :             : 
    2636                 :          80 :         logicalrep_worker_stop(w->type, w->subid, w->relid);
    2637                 :             :     }
    2638                 :         168 :     list_free(subworkers);
    2639                 :             : 
    2640                 :             :     /*
    2641                 :             :      * Remove the no-longer-useful entry in the launcher's table of apply
    2642                 :             :      * worker start times.
    2643                 :             :      *
    2644                 :             :      * If this transaction rolls back, the launcher might restart a failed
    2645                 :             :      * apply worker before wal_retrieve_retry_interval milliseconds have
    2646                 :             :      * elapsed, but that's pretty harmless.
    2647                 :             :      */
    2648                 :         168 :     ApplyLauncherForgetWorkerStartTime(subid);
    2649                 :             : 
    2650                 :             :     /*
    2651                 :             :      * Cleanup of tablesync replication origins.
    2652                 :             :      *
    2653                 :             :      * Any READY-state relations would already have dealt with clean-ups.
    2654                 :             :      *
    2655                 :             :      * Note that the state can't change because we have already stopped both
    2656                 :             :      * the apply and tablesync workers and they can't restart because of
    2657                 :             :      * exclusive lock on the subscription.
    2658                 :             :      */
    2659                 :         168 :     rstates = GetSubscriptionRelations(subid, true, false, true);
    2660   [ +  +  +  +  :         175 :     foreach(lc, rstates)
                   +  + ]
    2661                 :             :     {
    2662                 :           7 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2663                 :           7 :         Oid         relid = rstate->relid;
    2664                 :             : 
    2665                 :             :         /* Only cleanup resources of tablesync workers */
    2666         [ -  + ]:           7 :         if (!OidIsValid(relid))
    2667                 :           0 :             continue;
    2668                 :             : 
    2669                 :             :         /*
    2670                 :             :          * Drop the tablesync's origin tracking if exists.
    2671                 :             :          *
    2672                 :             :          * It is possible that the origin is not yet created for tablesync
    2673                 :             :          * worker so passing missing_ok = true. This can happen for the states
    2674                 :             :          * before SUBREL_STATE_DATASYNC.
    2675                 :             :          */
    2676                 :           7 :         ReplicationOriginNameForLogicalRep(subid, relid, originname,
    2677                 :             :                                            sizeof(originname));
    2678                 :           7 :         replorigin_drop_by_name(originname, true, false);
    2679                 :             :     }
    2680                 :             : 
    2681                 :             :     /* Drop subscription's conflict log table */
    2682                 :         168 :     drop_sub_conflict_log_table(subid, subname, subconflictlogrelid);
    2683                 :             : 
    2684                 :             :     /* Clean up dependencies */
    2685                 :         168 :     deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
    2686                 :         168 :     deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
    2687                 :             : 
    2688                 :             :     /* Remove any associated relation synchronization states. */
    2689                 :         168 :     RemoveSubscriptionRel(subid, InvalidOid);
    2690                 :             : 
    2691                 :             :     /* Remove the origin tracking if exists. */
    2692                 :         168 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
    2693                 :         168 :     replorigin_drop_by_name(originname, true, false);
    2694                 :             : 
    2695                 :             :     /*
    2696                 :             :      * Tell the cumulative stats system that the subscription is getting
    2697                 :             :      * dropped.
    2698                 :             :      */
    2699                 :         168 :     pgstat_drop_subscription(subid);
    2700                 :             : 
    2701                 :             :     /*
    2702                 :             :      * If there is no slot associated with the subscription, we can finish
    2703                 :             :      * here.
    2704                 :             :      */
    2705   [ +  +  +  + ]:         168 :     if (!slotname && rstates == NIL)
    2706                 :             :     {
    2707                 :          81 :         table_close(rel, NoLock);
    2708                 :          81 :         return;
    2709                 :             :     }
    2710                 :             : 
    2711                 :             :     /*
    2712                 :             :      * Try to acquire the connection necessary for dropping slots.
    2713                 :             :      *
    2714                 :             :      * Note: If the slotname is NONE/NULL then we allow the command to finish
    2715                 :             :      * and users need to manually cleanup the apply and tablesync worker slots
    2716                 :             :      * later.
    2717                 :             :      *
    2718                 :             :      * This has to be at the end because otherwise if there is an error while
    2719                 :             :      * doing the database operations we won't be able to rollback dropped
    2720                 :             :      * slot.
    2721                 :             :      */
    2722                 :          87 :     load_file("libpqwalreceiver", false);
    2723                 :             : 
    2724         [ +  + ]:          87 :     if (OidIsValid(subserver))
    2725                 :           8 :         conninfo = construct_subserver_conninfo(subserver, subowner, &err);
    2726                 :             :     else
    2727                 :          79 :         conninfo = subconninfo;
    2728                 :             : 
    2729         [ +  + ]:          83 :     if (conninfo)
    2730                 :          79 :         wrconn = walrcv_connect(conninfo, true, true, must_use_password,
    2731                 :             :                                 subname, &err);
    2732                 :             : 
    2733         [ +  + ]:          83 :     if (wrconn == NULL)
    2734                 :             :     {
    2735         [ -  + ]:           4 :         if (!slotname)
    2736                 :             :         {
    2737                 :             :             /* be tidy */
    2738                 :           0 :             list_free(rstates);
    2739                 :           0 :             table_close(rel, NoLock);
    2740                 :           0 :             return;
    2741                 :             :         }
    2742                 :             :         else
    2743                 :             :         {
    2744                 :           4 :             ReportSlotConnectionError(rstates, subid, slotname, err);
    2745                 :             :         }
    2746                 :             :     }
    2747                 :             : 
    2748         [ +  + ]:          79 :     PG_TRY();
    2749                 :             :     {
    2750   [ +  +  +  +  :          86 :         foreach(lc, rstates)
                   +  + ]
    2751                 :             :         {
    2752                 :           7 :             SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2753                 :           7 :             Oid         relid = rstate->relid;
    2754                 :             : 
    2755                 :             :             /* Only cleanup resources of tablesync workers */
    2756         [ -  + ]:           7 :             if (!OidIsValid(relid))
    2757                 :           0 :                 continue;
    2758                 :             : 
    2759                 :             :             /*
    2760                 :             :              * Drop the tablesync slots associated with removed tables.
    2761                 :             :              *
    2762                 :             :              * For SYNCDONE/READY states, the tablesync slot is known to have
    2763                 :             :              * already been dropped by the tablesync worker.
    2764                 :             :              *
    2765                 :             :              * For other states, there is no certainty, maybe the slot does
    2766                 :             :              * not exist yet. Also, if we fail after removing some of the
    2767                 :             :              * slots, next time, it will again try to drop already dropped
    2768                 :             :              * slots and fail. For these reasons, we allow missing_ok = true
    2769                 :             :              * for the drop.
    2770                 :             :              */
    2771         [ +  + ]:           7 :             if (rstate->state != SUBREL_STATE_SYNCDONE)
    2772                 :             :             {
    2773                 :           5 :                 char        syncslotname[NAMEDATALEN] = {0};
    2774                 :             : 
    2775                 :           5 :                 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    2776                 :             :                                                 sizeof(syncslotname));
    2777                 :           5 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    2778                 :             :             }
    2779                 :             :         }
    2780                 :             : 
    2781                 :          79 :         list_free(rstates);
    2782                 :             : 
    2783                 :             :         /*
    2784                 :             :          * If there is a slot associated with the subscription, then drop the
    2785                 :             :          * replication slot at the publisher.
    2786                 :             :          */
    2787         [ +  + ]:          79 :         if (slotname)
    2788                 :          78 :             ReplicationSlotDropAtPubNode(wrconn, slotname, false);
    2789                 :             :     }
    2790                 :           1 :     PG_FINALLY();
    2791                 :             :     {
    2792                 :          79 :         walrcv_disconnect(wrconn);
    2793                 :             :     }
    2794         [ +  + ]:          79 :     PG_END_TRY();
    2795                 :             : 
    2796                 :          78 :     table_close(rel, NoLock);
    2797                 :             : }
    2798                 :             : 
    2799                 :             : /*
    2800                 :             :  * Drop the replication slot at the publisher node using the replication
    2801                 :             :  * connection.
    2802                 :             :  *
    2803                 :             :  * missing_ok - if true then only issue a LOG message if the slot doesn't
    2804                 :             :  * exist.
    2805                 :             :  */
    2806                 :             : void
    2807                 :         291 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
    2808                 :             : {
    2809                 :             :     StringInfoData cmd;
    2810                 :             : 
    2811                 :             :     Assert(wrconn);
    2812                 :             : 
    2813                 :         291 :     load_file("libpqwalreceiver", false);
    2814                 :             : 
    2815                 :         291 :     initStringInfo(&cmd);
    2816                 :         291 :     appendStringInfoString(&cmd, "DROP_REPLICATION_SLOT ");
    2817                 :         291 :     appendQuotedIdentifier(&cmd, slotname);
    2818                 :         291 :     appendStringInfoString(&cmd, " WAIT");
    2819                 :             : 
    2820         [ +  + ]:         291 :     PG_TRY();
    2821                 :             :     {
    2822                 :             :         WalRcvExecResult *res;
    2823                 :             : 
    2824                 :         291 :         res = walrcv_exec(wrconn, cmd.data, 0, NULL);
    2825                 :             : 
    2826         [ +  + ]:         291 :         if (res->status == WALRCV_OK_COMMAND)
    2827                 :             :         {
    2828                 :             :             /* NOTICE. Success. */
    2829         [ +  + ]:         288 :             ereport(NOTICE,
    2830                 :             :                     (errmsg("dropped replication slot \"%s\" on publisher",
    2831                 :             :                             slotname)));
    2832                 :             :         }
    2833   [ +  -  +  + ]:           3 :         else if (res->status == WALRCV_ERROR &&
    2834                 :           2 :                  missing_ok &&
    2835         [ +  - ]:           2 :                  res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
    2836                 :             :         {
    2837                 :             :             /* LOG. Error, but missing_ok = true. */
    2838         [ +  - ]:           2 :             ereport(LOG,
    2839                 :             :                     (errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2840                 :             :                             slotname, res->err)));
    2841                 :             :         }
    2842                 :             :         else
    2843                 :             :         {
    2844                 :             :             /* ERROR. */
    2845         [ +  - ]:           1 :             ereport(ERROR,
    2846                 :             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    2847                 :             :                      errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2848                 :             :                             slotname, res->err)));
    2849                 :             :         }
    2850                 :             : 
    2851                 :         290 :         walrcv_clear_result(res);
    2852                 :             :     }
    2853                 :           1 :     PG_FINALLY();
    2854                 :             :     {
    2855                 :         291 :         pfree(cmd.data);
    2856                 :             :     }
    2857         [ +  + ]:         291 :     PG_END_TRY();
    2858                 :         290 : }
    2859                 :             : 
    2860                 :             : /*
    2861                 :             :  * Internal workhorse for changing a subscription owner
    2862                 :             :  */
    2863                 :             : static void
    2864                 :          19 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2865                 :             : {
    2866                 :             :     Form_pg_subscription form;
    2867                 :             :     AclResult   aclresult;
    2868                 :             : 
    2869                 :          19 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2870                 :             : 
    2871         [ +  + ]:          19 :     if (form->subowner == newOwnerId)
    2872                 :           2 :         return;
    2873                 :             : 
    2874         [ -  + ]:          17 :     if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
    2875                 :           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2876                 :           0 :                        NameStr(form->subname));
    2877                 :             : 
    2878                 :             :     /*
    2879                 :             :      * Don't allow non-superuser modification of a subscription with
    2880                 :             :      * password_required=false.
    2881                 :             :      */
    2882   [ -  +  -  - ]:          17 :     if (!form->subpasswordrequired && !superuser())
    2883         [ #  # ]:           0 :         ereport(ERROR,
    2884                 :             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2885                 :             :                  errmsg("password_required=false is superuser-only"),
    2886                 :             :                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    2887                 :             : 
    2888                 :             :     /* Must be able to become new owner */
    2889                 :          17 :     check_can_set_role(GetUserId(), newOwnerId);
    2890                 :             : 
    2891                 :             :     /*
    2892                 :             :      * current owner must have CREATE on database
    2893                 :             :      *
    2894                 :             :      * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
    2895                 :             :      * other object types behave differently (e.g. you can't give a table to a
    2896                 :             :      * user who lacks CREATE privileges on a schema).
    2897                 :             :      */
    2898                 :          13 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
    2899                 :             :                                 GetUserId(), ACL_CREATE);
    2900         [ -  + ]:          13 :     if (aclresult != ACLCHECK_OK)
    2901                 :           0 :         aclcheck_error(aclresult, OBJECT_DATABASE,
    2902                 :           0 :                        get_database_name(MyDatabaseId));
    2903                 :             : 
    2904                 :             :     /*
    2905                 :             :      * If the subscription uses a server, check that the new owner has USAGE
    2906                 :             :      * privileges on the server and that a user mapping exists. Note: does not
    2907                 :             :      * re-check the resulting connection string.
    2908                 :             :      */
    2909         [ -  + ]:          13 :     if (OidIsValid(form->subserver))
    2910                 :             :     {
    2911                 :           0 :         ForeignServer *server = GetForeignServer(form->subserver);
    2912                 :             : 
    2913                 :           0 :         aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, newOwnerId, ACL_USAGE);
    2914         [ #  # ]:           0 :         if (aclresult != ACLCHECK_OK)
    2915         [ #  # ]:           0 :             ereport(ERROR,
    2916                 :             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2917                 :             :                     errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
    2918                 :             :                            GetUserNameFromId(newOwnerId, false),
    2919                 :             :                            server->servername));
    2920                 :             : 
    2921                 :             :         /* make sure a user mapping exists */
    2922                 :           0 :         GetUserMapping(newOwnerId, server->serverid);
    2923                 :             :     }
    2924                 :             : 
    2925                 :          13 :     form->subowner = newOwnerId;
    2926                 :          13 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2927                 :             : 
    2928                 :             :     /* Update owner of the conflict log table if it exists. */
    2929         [ +  + ]:          13 :     if (OidIsValid(form->subconflictlogrelid))
    2930                 :           8 :         ATExecChangeOwner(form->subconflictlogrelid, newOwnerId, true,
    2931                 :             :                           AccessExclusiveLock);
    2932                 :             : 
    2933                 :             :     /* Update owner dependency reference */
    2934                 :          13 :     changeDependencyOnOwner(SubscriptionRelationId,
    2935                 :             :                             form->oid,
    2936                 :             :                             newOwnerId);
    2937                 :             : 
    2938         [ -  + ]:          13 :     InvokeObjectPostAlterHook(SubscriptionRelationId,
    2939                 :             :                               form->oid, 0);
    2940                 :             : 
    2941                 :             :     /* Wake up related background processes to handle this change quickly. */
    2942                 :          13 :     ApplyLauncherWakeupAtCommit();
    2943                 :          13 :     LogicalRepWorkersWakeupAtCommit(form->oid);
    2944                 :             : }
    2945                 :             : 
    2946                 :             : /*
    2947                 :             :  * Change subscription owner -- by name
    2948                 :             :  */
    2949                 :             : ObjectAddress
    2950                 :          19 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
    2951                 :             : {
    2952                 :             :     Oid         subid;
    2953                 :             :     HeapTuple   tup;
    2954                 :             :     Relation    rel;
    2955                 :             :     ObjectAddress address;
    2956                 :             :     Form_pg_subscription form;
    2957                 :             : 
    2958                 :          19 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2959                 :             : 
    2960                 :          19 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2961                 :             :                               CStringGetDatum(name));
    2962                 :             : 
    2963         [ -  + ]:          19 :     if (!HeapTupleIsValid(tup))
    2964         [ #  # ]:           0 :         ereport(ERROR,
    2965                 :             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2966                 :             :                  errmsg("subscription \"%s\" does not exist", name)));
    2967                 :             : 
    2968                 :          19 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    2969                 :          19 :     subid = form->oid;
    2970                 :             : 
    2971                 :          19 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2972                 :             : 
    2973                 :          15 :     ObjectAddressSet(address, SubscriptionRelationId, subid);
    2974                 :             : 
    2975                 :          15 :     heap_freetuple(tup);
    2976                 :             : 
    2977                 :          15 :     table_close(rel, RowExclusiveLock);
    2978                 :             : 
    2979                 :          15 :     return address;
    2980                 :             : }
    2981                 :             : 
    2982                 :             : /*
    2983                 :             :  * Change subscription owner -- by OID
    2984                 :             :  */
    2985                 :             : void
    2986                 :           0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
    2987                 :             : {
    2988                 :             :     HeapTuple   tup;
    2989                 :             :     Relation    rel;
    2990                 :             : 
    2991                 :           0 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2992                 :             : 
    2993                 :           0 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
    2994                 :             : 
    2995         [ #  # ]:           0 :     if (!HeapTupleIsValid(tup))
    2996         [ #  # ]:           0 :         ereport(ERROR,
    2997                 :             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2998                 :             :                  errmsg("subscription with OID %u does not exist", subid)));
    2999                 :             : 
    3000                 :           0 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    3001                 :             : 
    3002                 :           0 :     heap_freetuple(tup);
    3003                 :             : 
    3004                 :           0 :     table_close(rel, RowExclusiveLock);
    3005                 :           0 : }
    3006                 :             : 
    3007                 :             : /*
    3008                 :             :  * Check and log a warning if the publisher has subscribed to the same table,
    3009                 :             :  * its partition ancestors (if it's a partition), or its partition children (if
    3010                 :             :  * it's a partitioned table), from some other publishers. This check is
    3011                 :             :  * required in the following scenarios:
    3012                 :             :  *
    3013                 :             :  * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    3014                 :             :  *    statements with "copy_data = true" and "origin = none":
    3015                 :             :  *    - Warn the user that data with an origin might have been copied.
    3016                 :             :  *    - This check is skipped for tables already added, as incremental sync via
    3017                 :             :  *      WAL allows origin tracking. The list of such tables is in
    3018                 :             :  *      subrel_local_oids.
    3019                 :             :  *
    3020                 :             :  * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    3021                 :             :  *    statements with "retain_dead_tuples = true" and "origin = any", and for
    3022                 :             :  *    ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
    3023                 :             :  *    or when the publisher's status changes (e.g., due to a connection string
    3024                 :             :  *    update):
    3025                 :             :  *    - Warn the user that only conflict detection info for local changes on
    3026                 :             :  *      the publisher is retained. Data from other origins may lack sufficient
    3027                 :             :  *      details for reliable conflict detection.
    3028                 :             :  *    - See comments atop worker.c for more details.
    3029                 :             :  */
    3030                 :             : static void
    3031                 :         177 : check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
    3032                 :             :                                  bool copydata, bool retain_dead_tuples,
    3033                 :             :                                  char *origin, Oid *subrel_local_oids,
    3034                 :             :                                  int subrel_count, char *subname)
    3035                 :             : {
    3036                 :             :     WalRcvExecResult *res;
    3037                 :             :     StringInfoData cmd;
    3038                 :             :     TupleTableSlot *slot;
    3039                 :         177 :     Oid         tableRow[1] = {TEXTOID};
    3040                 :         177 :     List       *publist = NIL;
    3041                 :             :     int         i;
    3042                 :             :     bool        check_rdt;
    3043                 :             :     bool        check_table_sync;
    3044   [ +  -  +  + ]:         354 :     bool        origin_none = origin &&
    3045                 :         177 :         pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
    3046                 :             : 
    3047                 :             :     /*
    3048                 :             :      * Enable retain_dead_tuples checks only when origin is set to 'any',
    3049                 :             :      * since with origin='none' only local changes are replicated to the
    3050                 :             :      * subscriber.
    3051                 :             :      */
    3052   [ +  +  +  + ]:         177 :     check_rdt = retain_dead_tuples && !origin_none;
    3053                 :             : 
    3054                 :             :     /*
    3055                 :             :      * Enable table synchronization checks only when origin is 'none', to
    3056                 :             :      * ensure that data from other origins is not inadvertently copied.
    3057                 :             :      */
    3058   [ +  +  +  + ]:         177 :     check_table_sync = copydata && origin_none;
    3059                 :             : 
    3060                 :             :     /* retain_dead_tuples and table sync checks occur separately */
    3061                 :             :     Assert(!(check_rdt && check_table_sync));
    3062                 :             : 
    3063                 :             :     /* Return if no checks are required */
    3064   [ +  +  +  + ]:         177 :     if (!check_rdt && !check_table_sync)
    3065                 :         163 :         return;
    3066                 :             : 
    3067                 :          14 :     initStringInfo(&cmd);
    3068                 :          14 :     appendStringInfoString(&cmd,
    3069                 :             :                            "SELECT DISTINCT P.pubname AS pubname\n"
    3070                 :             :                            "FROM pg_publication P,\n"
    3071                 :             :                            "     LATERAL pg_get_publication_tables(P.pubname) GPT\n"
    3072                 :             :                            "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
    3073                 :             :                            "     GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
    3074                 :             :                            "                   SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
    3075                 :             :                            "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    3076                 :             :                            "WHERE C.oid = GPT.relid AND P.pubname IN (");
    3077                 :          14 :     GetPublicationsStr(publications, &cmd, true);
    3078                 :          14 :     appendStringInfoString(&cmd, ")\n");
    3079                 :             : 
    3080                 :             :     /*
    3081                 :             :      * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    3082                 :             :      * subrel_local_oids contains the list of relation oids that are already
    3083                 :             :      * present on the subscriber. This check should be skipped for these
    3084                 :             :      * tables if checking for table sync scenario. However, when handling the
    3085                 :             :      * retain_dead_tuples scenario, ensure all tables are checked, as some
    3086                 :             :      * existing tables may now include changes from other origins due to newly
    3087                 :             :      * created subscriptions on the publisher.
    3088                 :             :      */
    3089         [ +  + ]:          14 :     if (check_table_sync)
    3090                 :             :     {
    3091         [ +  + ]:          14 :         for (i = 0; i < subrel_count; i++)
    3092                 :             :         {
    3093                 :           4 :             Oid         relid = subrel_local_oids[i];
    3094                 :           4 :             char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    3095                 :           4 :             char       *tablename = get_rel_name(relid);
    3096                 :           4 :             char       *schemaname_lit = quote_literal_cstr(schemaname);
    3097                 :           4 :             char       *tablename_lit = quote_literal_cstr(tablename);
    3098                 :             : 
    3099                 :           4 :             appendStringInfo(&cmd, "AND NOT (N.nspname = %s AND C.relname = %s)\n",
    3100                 :             :                              schemaname_lit, tablename_lit);
    3101                 :             : 
    3102                 :           4 :             pfree(schemaname_lit);
    3103                 :           4 :             pfree(tablename_lit);
    3104                 :             :         }
    3105                 :             :     }
    3106                 :             : 
    3107                 :          14 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    3108                 :          14 :     pfree(cmd.data);
    3109                 :             : 
    3110         [ -  + ]:          14 :     if (res->status != WALRCV_OK_TUPLES)
    3111         [ #  # ]:           0 :         ereport(ERROR,
    3112                 :             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    3113                 :             :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    3114                 :             :                         res->err)));
    3115                 :             : 
    3116                 :             :     /* Process publications. */
    3117                 :          14 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    3118         [ +  + ]:          19 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    3119                 :             :     {
    3120                 :             :         char       *pubname;
    3121                 :             :         bool        isnull;
    3122                 :             : 
    3123                 :           5 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    3124                 :             :         Assert(!isnull);
    3125                 :             : 
    3126                 :           5 :         ExecClearTuple(slot);
    3127                 :           5 :         publist = list_append_unique(publist, makeString(pubname));
    3128                 :             :     }
    3129                 :             : 
    3130                 :             :     /*
    3131                 :             :      * Log a warning if the publisher has subscribed to the same table from
    3132                 :             :      * some other publisher. We cannot know the origin of data during the
    3133                 :             :      * initial sync. Data origins can be found only from the WAL by looking at
    3134                 :             :      * the origin id.
    3135                 :             :      *
    3136                 :             :      * XXX: For simplicity, we don't check whether the table has any data or
    3137                 :             :      * not. If the table doesn't have any data then we don't need to
    3138                 :             :      * distinguish between data having origin and data not having origin so we
    3139                 :             :      * can avoid logging a warning for table sync scenario.
    3140                 :             :      */
    3141         [ +  + ]:          14 :     if (publist)
    3142                 :             :     {
    3143                 :             :         StringInfoData pubnames;
    3144                 :             : 
    3145                 :             :         /* Prepare the list of publication(s) for warning message. */
    3146                 :           5 :         initStringInfo(&pubnames);
    3147                 :           5 :         GetPublicationsStr(publist, &pubnames, false);
    3148                 :             : 
    3149         [ +  + ]:           5 :         if (check_table_sync)
    3150         [ +  - ]:           4 :             ereport(WARNING,
    3151                 :             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3152                 :             :                     errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    3153                 :             :                            subname),
    3154                 :             :                     errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    3155                 :             :                                      "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    3156                 :             :                                      list_length(publist), pubnames.data),
    3157                 :             :                     errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
    3158                 :             :         else
    3159         [ +  - ]:           1 :             ereport(WARNING,
    3160                 :             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3161                 :             :                     errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
    3162                 :             :                            subname),
    3163                 :             :                     errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    3164                 :             :                                      "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    3165                 :             :                                      list_length(publist), pubnames.data),
    3166                 :             :                     errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
    3167                 :             :     }
    3168                 :             : 
    3169                 :          14 :     ExecDropSingleTupleTableSlot(slot);
    3170                 :             : 
    3171                 :          14 :     walrcv_clear_result(res);
    3172                 :             : }
    3173                 :             : 
    3174                 :             : /*
    3175                 :             :  * This function is similar to check_publications_origin_tables and serves
    3176                 :             :  * same purpose for sequences.
    3177                 :             :  */
    3178                 :             : static void
    3179                 :         167 : check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
    3180                 :             :                                     bool copydata, char *origin,
    3181                 :             :                                     Oid *subrel_local_oids, int subrel_count,
    3182                 :             :                                     char *subname)
    3183                 :             : {
    3184                 :             :     WalRcvExecResult *res;
    3185                 :             :     StringInfoData cmd;
    3186                 :             :     TupleTableSlot *slot;
    3187                 :         167 :     Oid         tableRow[1] = {TEXTOID};
    3188                 :         167 :     List       *publist = NIL;
    3189                 :             : 
    3190                 :             :     /*
    3191                 :             :      * Enable sequence synchronization checks only when origin is 'none' , to
    3192                 :             :      * ensure that sequence data from other origins is not inadvertently
    3193                 :             :      * copied. This check is necessary if the publisher is running PG19 or
    3194                 :             :      * later, where logical replication sequence synchronization is supported.
    3195                 :             :      */
    3196   [ +  +  +  +  :         177 :     if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
                   -  + ]
    3197                 :          10 :         walrcv_server_version(wrconn) < 190000)
    3198                 :         157 :         return;
    3199                 :             : 
    3200                 :          10 :     initStringInfo(&cmd);
    3201                 :          10 :     appendStringInfoString(&cmd,
    3202                 :             :                            "SELECT DISTINCT P.pubname AS pubname\n"
    3203                 :             :                            "FROM pg_publication P,\n"
    3204                 :             :                            "     LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
    3205                 :             :                            "     JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
    3206                 :             :                            "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    3207                 :             :                            "WHERE C.oid = GPS.relid AND P.pubname IN (");
    3208                 :             : 
    3209                 :          10 :     GetPublicationsStr(publications, &cmd, true);
    3210                 :          10 :     appendStringInfoString(&cmd, ")\n");
    3211                 :             : 
    3212                 :             :     /*
    3213                 :             :      * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    3214                 :             :      * subrel_local_oids contains the list of relations that are already
    3215                 :             :      * present on the subscriber. This check should be skipped as these will
    3216                 :             :      * not be re-synced.
    3217                 :             :      */
    3218         [ -  + ]:          10 :     for (int i = 0; i < subrel_count; i++)
    3219                 :             :     {
    3220                 :           0 :         Oid         relid = subrel_local_oids[i];
    3221                 :           0 :         char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    3222                 :           0 :         char       *seqname = get_rel_name(relid);
    3223                 :           0 :         char       *schemaname_lit = quote_literal_cstr(schemaname);
    3224                 :           0 :         char       *seqname_lit = quote_literal_cstr(seqname);
    3225                 :             : 
    3226                 :           0 :         appendStringInfo(&cmd,
    3227                 :             :                          "AND NOT (N.nspname = %s AND C.relname = %s)\n",
    3228                 :             :                          schemaname_lit, seqname_lit);
    3229                 :             : 
    3230                 :           0 :         pfree(schemaname_lit);
    3231                 :           0 :         pfree(seqname_lit);
    3232                 :             :     }
    3233                 :             : 
    3234                 :          10 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    3235                 :          10 :     pfree(cmd.data);
    3236                 :             : 
    3237         [ -  + ]:          10 :     if (res->status != WALRCV_OK_TUPLES)
    3238         [ #  # ]:           0 :         ereport(ERROR,
    3239                 :             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    3240                 :             :                  errmsg("could not receive list of replicated sequences from the publisher: %s",
    3241                 :             :                         res->err)));
    3242                 :             : 
    3243                 :             :     /* Process publications. */
    3244                 :          10 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    3245         [ -  + ]:          10 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    3246                 :             :     {
    3247                 :             :         char       *pubname;
    3248                 :             :         bool        isnull;
    3249                 :             : 
    3250                 :           0 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    3251                 :             :         Assert(!isnull);
    3252                 :             : 
    3253                 :           0 :         ExecClearTuple(slot);
    3254                 :           0 :         publist = list_append_unique(publist, makeString(pubname));
    3255                 :             :     }
    3256                 :             : 
    3257                 :             :     /*
    3258                 :             :      * Log a warning if the publisher has subscribed to the same sequence from
    3259                 :             :      * some other publisher. We cannot know the origin of sequences data
    3260                 :             :      * during the initial sync.
    3261                 :             :      */
    3262         [ -  + ]:          10 :     if (publist)
    3263                 :             :     {
    3264                 :             :         StringInfoData pubnames;
    3265                 :             : 
    3266                 :             :         /* Prepare the list of publication(s) for warning message. */
    3267                 :           0 :         initStringInfo(&pubnames);
    3268                 :           0 :         GetPublicationsStr(publist, &pubnames, false);
    3269                 :             : 
    3270         [ #  # ]:           0 :         ereport(WARNING,
    3271                 :             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3272                 :             :                 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    3273                 :             :                        subname),
    3274                 :             :                 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
    3275                 :             :                                  "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
    3276                 :             :                                  list_length(publist), pubnames.data),
    3277                 :             :                 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
    3278                 :             :     }
    3279                 :             : 
    3280                 :          10 :     ExecDropSingleTupleTableSlot(slot);
    3281                 :             : 
    3282                 :          10 :     walrcv_clear_result(res);
    3283                 :             : }
    3284                 :             : 
    3285                 :             : /*
    3286                 :             :  * Determine whether the retain_dead_tuples can be enabled based on the
    3287                 :             :  * publisher's status.
    3288                 :             :  *
    3289                 :             :  * This option is disallowed if the publisher is running a version earlier
    3290                 :             :  * than the PG19, or if the publisher is in recovery (i.e., it is a standby
    3291                 :             :  * server).
    3292                 :             :  *
    3293                 :             :  * See comments atop worker.c for a detailed explanation.
    3294                 :             :  */
    3295                 :             : static void
    3296                 :          12 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
    3297                 :             : {
    3298                 :             :     WalRcvExecResult *res;
    3299                 :          12 :     Oid         RecoveryRow[1] = {BOOLOID};
    3300                 :             :     TupleTableSlot *slot;
    3301                 :             :     bool        isnull;
    3302                 :             :     bool        remote_in_recovery;
    3303                 :             : 
    3304         [ -  + ]:          12 :     if (walrcv_server_version(wrconn) < 190000)
    3305         [ #  # ]:           0 :         ereport(ERROR,
    3306                 :             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3307                 :             :                 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
    3308                 :             : 
    3309                 :          12 :     res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
    3310                 :             : 
    3311         [ -  + ]:          12 :     if (res->status != WALRCV_OK_TUPLES)
    3312         [ #  # ]:           0 :         ereport(ERROR,
    3313                 :             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    3314                 :             :                  errmsg("could not obtain recovery progress from the publisher: %s",
    3315                 :             :                         res->err)));
    3316                 :             : 
    3317                 :          12 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    3318         [ -  + ]:          12 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    3319         [ #  # ]:           0 :         elog(ERROR, "failed to fetch tuple for the recovery progress");
    3320                 :             : 
    3321                 :          12 :     remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
    3322                 :             : 
    3323         [ -  + ]:          12 :     if (remote_in_recovery)
    3324         [ #  # ]:           0 :         ereport(ERROR,
    3325                 :             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3326                 :             :                 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery"));
    3327                 :             : 
    3328                 :          12 :     ExecDropSingleTupleTableSlot(slot);
    3329                 :             : 
    3330                 :          12 :     walrcv_clear_result(res);
    3331                 :          12 : }
    3332                 :             : 
    3333                 :             : /*
    3334                 :             :  * Check if the subscriber's configuration is adequate to enable the
    3335                 :             :  * retain_dead_tuples option.
    3336                 :             :  *
    3337                 :             :  * Issue an ERROR if the wal_level does not support the use of replication
    3338                 :             :  * slots when check_guc is set to true.
    3339                 :             :  *
    3340                 :             :  * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
    3341                 :             :  * set to true. This is only to highlight the importance of enabling
    3342                 :             :  * track_commit_timestamp instead of catching all the misconfigurations, as
    3343                 :             :  * this setting can be adjusted after subscription creation. Without it, the
    3344                 :             :  * apply worker will simply skip conflict detection.
    3345                 :             :  *
    3346                 :             :  * Issue a WARNING or NOTICE if the subscription is disabled and the retention
    3347                 :             :  * is active. Do not raise an ERROR since users can only modify
    3348                 :             :  * retain_dead_tuples for disabled subscriptions. And as long as the
    3349                 :             :  * subscription is enabled promptly, it will not pose issues.
    3350                 :             :  *
    3351                 :             :  * Issue a NOTICE to inform users that max_retention_duration is
    3352                 :             :  * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
    3353                 :             :  * is not issued because setting max_retention_duration causes no harm,
    3354                 :             :  * even when it is ineffective.
    3355                 :             :  */
    3356                 :             : void
    3357                 :         330 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
    3358                 :             :                            int elevel_for_sub_disabled,
    3359                 :             :                            bool retain_dead_tuples, bool retention_active,
    3360                 :             :                            bool max_retention_set)
    3361                 :             : {
    3362                 :             :     Assert(elevel_for_sub_disabled == NOTICE ||
    3363                 :             :            elevel_for_sub_disabled == WARNING);
    3364                 :             : 
    3365         [ +  + ]:         330 :     if (retain_dead_tuples)
    3366                 :             :     {
    3367   [ +  +  -  + ]:          17 :         if (check_guc && wal_level < WAL_LEVEL_REPLICA)
    3368         [ #  # ]:           0 :             ereport(ERROR,
    3369                 :             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3370                 :             :                     errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
    3371                 :             :                     errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
    3372                 :             : 
    3373   [ +  +  +  + ]:          17 :         if (check_guc && !track_commit_timestamp)
    3374         [ +  - ]:           4 :             ereport(WARNING,
    3375                 :             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3376                 :             :                     errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
    3377                 :             :                     errhint("Consider setting \"%s\" to true.",
    3378                 :             :                             "track_commit_timestamp"));
    3379                 :             : 
    3380   [ +  +  +  - ]:          17 :         if (sub_disabled && retention_active)
    3381   [ +  -  +  + ]:           7 :             ereport(elevel_for_sub_disabled,
    3382                 :             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3383                 :             :                     errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
    3384                 :             :                     (elevel_for_sub_disabled > NOTICE)
    3385                 :             :                     ? errhint("Consider setting %s to false.",
    3386                 :             :                               "retain_dead_tuples") : 0);
    3387                 :             :     }
    3388         [ +  + ]:         313 :     else if (max_retention_set)
    3389                 :             :     {
    3390         [ +  - ]:           4 :         ereport(NOTICE,
    3391                 :             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3392                 :             :                 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
    3393                 :             :     }
    3394                 :         330 : }
    3395                 :             : 
    3396                 :             : /*
    3397                 :             :  * Return true iff 'rv' is a member of the list.
    3398                 :             :  */
    3399                 :             : static bool
    3400                 :         287 : list_member_rangevar(const List *list, RangeVar *rv)
    3401                 :             : {
    3402   [ +  +  +  +  :        1044 :     foreach_ptr(PublicationRelKind, relinfo, list)
                   +  + ]
    3403                 :             :     {
    3404         [ +  + ]:         472 :         if (equal(relinfo->rv, rv))
    3405                 :           1 :             return true;
    3406                 :             :     }
    3407                 :             : 
    3408                 :         286 :     return false;
    3409                 :             : }
    3410                 :             : 
    3411                 :             : /*
    3412                 :             :  * Get the list of tables and sequences which belong to specified publications
    3413                 :             :  * on the publisher connection.
    3414                 :             :  *
    3415                 :             :  * Note that we don't support the case where the column list is different for
    3416                 :             :  * the same table in different publications to avoid sending unwanted column
    3417                 :             :  * information for some of the rows. This can happen when both the column
    3418                 :             :  * list and row filter are specified for different publications.
    3419                 :             :  */
    3420                 :             : static List *
    3421                 :         164 : fetch_relation_list(WalReceiverConn *wrconn, List *publications)
    3422                 :             : {
    3423                 :             :     WalRcvExecResult *res;
    3424                 :             :     StringInfoData cmd;
    3425                 :             :     TupleTableSlot *slot;
    3426                 :         164 :     Oid         tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
    3427                 :         164 :     List       *relationlist = NIL;
    3428                 :         164 :     int         server_version = walrcv_server_version(wrconn);
    3429                 :         164 :     bool        check_columnlist = (server_version >= 150000);
    3430         [ +  - ]:         164 :     int         column_count = check_columnlist ? 4 : 3;
    3431                 :             :     StringInfoData pub_names;
    3432                 :             : 
    3433                 :         164 :     initStringInfo(&cmd);
    3434                 :         164 :     initStringInfo(&pub_names);
    3435                 :             : 
    3436                 :             :     /* Build the pub_names comma-separated string. */
    3437                 :         164 :     GetPublicationsStr(publications, &pub_names, true);
    3438                 :             : 
    3439                 :             :     /* Get the list of relations from the publisher */
    3440         [ +  - ]:         164 :     if (server_version >= 160000)
    3441                 :             :     {
    3442                 :         164 :         tableRow[3] = INT2VECTOROID;
    3443                 :             : 
    3444                 :             :         /*
    3445                 :             :          * From version 16, we allowed passing multiple publications to the
    3446                 :             :          * function pg_get_publication_tables. This helped to filter out the
    3447                 :             :          * partition table whose ancestor is also published in this
    3448                 :             :          * publication array.
    3449                 :             :          *
    3450                 :             :          * Join pg_get_publication_tables with pg_publication to exclude
    3451                 :             :          * non-existing publications.
    3452                 :             :          *
    3453                 :             :          * Note that attrs are always stored in sorted order so we don't need
    3454                 :             :          * to worry if different publications have specified them in a
    3455                 :             :          * different order. See pub_collist_validate.
    3456                 :             :          */
    3457                 :         164 :         appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
    3458                 :             :                          "   FROM pg_class c\n"
    3459                 :             :                          "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
    3460                 :             :                          "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
    3461                 :             :                          "                FROM pg_publication\n"
    3462                 :             :                          "                WHERE pubname IN ( %s )) AS gpt\n"
    3463                 :             :                          "             ON gpt.relid = c.oid\n",
    3464                 :             :                          pub_names.data);
    3465                 :             : 
    3466                 :             :         /* From version 19, inclusion of sequences in the target is supported */
    3467         [ +  - ]:         164 :         if (server_version >= 190000)
    3468                 :         164 :             appendStringInfo(&cmd,
    3469                 :             :                              "UNION ALL\n"
    3470                 :             :                              "  SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
    3471                 :             :                              "  FROM pg_catalog.pg_publication_sequences s\n"
    3472                 :             :                              "  WHERE s.pubname IN ( %s )",
    3473                 :             :                              pub_names.data);
    3474                 :             :     }
    3475                 :             :     else
    3476                 :             :     {
    3477                 :           0 :         tableRow[3] = NAMEARRAYOID;
    3478                 :           0 :         appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
    3479                 :             : 
    3480                 :             :         /* Get column lists for each relation if the publisher supports it */
    3481         [ #  # ]:           0 :         if (check_columnlist)
    3482                 :           0 :             appendStringInfoString(&cmd, ", t.attnames\n");
    3483                 :             : 
    3484                 :           0 :         appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
    3485                 :             :                          " WHERE t.pubname IN ( %s )",
    3486                 :             :                          pub_names.data);
    3487                 :             :     }
    3488                 :             : 
    3489                 :         164 :     pfree(pub_names.data);
    3490                 :             : 
    3491                 :         164 :     res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
    3492                 :         164 :     pfree(cmd.data);
    3493                 :             : 
    3494         [ -  + ]:         164 :     if (res->status != WALRCV_OK_TUPLES)
    3495         [ #  # ]:           0 :         ereport(ERROR,
    3496                 :             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    3497                 :             :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    3498                 :             :                         res->err)));
    3499                 :             : 
    3500                 :             :     /* Process tables. */
    3501                 :         164 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    3502         [ +  + ]:         467 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    3503                 :             :     {
    3504                 :             :         char       *nspname;
    3505                 :             :         char       *relname;
    3506                 :             :         bool        isnull;
    3507                 :             :         char        relkind;
    3508                 :         304 :         PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
    3509                 :             : 
    3510                 :         304 :         nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    3511                 :             :         Assert(!isnull);
    3512                 :         304 :         relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
    3513                 :             :         Assert(!isnull);
    3514                 :         304 :         relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
    3515                 :             :         Assert(!isnull);
    3516                 :             : 
    3517                 :         304 :         relinfo->rv = makeRangeVar(nspname, relname, -1);
    3518                 :         304 :         relinfo->relkind = relkind;
    3519                 :             : 
    3520   [ +  +  +  - ]:         304 :         if (relkind != RELKIND_SEQUENCE &&
    3521         [ +  + ]:         287 :             check_columnlist &&
    3522                 :         287 :             list_member_rangevar(relationlist, relinfo->rv))
    3523         [ +  - ]:           1 :             ereport(ERROR,
    3524                 :             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3525                 :             :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    3526                 :             :                            nspname, relname));
    3527                 :             :         else
    3528                 :         303 :             relationlist = lappend(relationlist, relinfo);
    3529                 :             : 
    3530                 :         303 :         ExecClearTuple(slot);
    3531                 :             :     }
    3532                 :         163 :     ExecDropSingleTupleTableSlot(slot);
    3533                 :             : 
    3534                 :         163 :     walrcv_clear_result(res);
    3535                 :             : 
    3536                 :         163 :     return relationlist;
    3537                 :             : }
    3538                 :             : 
    3539                 :             : /*
    3540                 :             :  * This is to report the connection failure while dropping replication slots.
    3541                 :             :  * Here, we report the WARNING for all tablesync slots so that user can drop
    3542                 :             :  * them manually, if required.
    3543                 :             :  */
    3544                 :             : static void
    3545                 :           4 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
    3546                 :             : {
    3547                 :             :     ListCell   *lc;
    3548                 :             : 
    3549   [ -  +  -  -  :           4 :     foreach(lc, rstates)
                   -  + ]
    3550                 :             :     {
    3551                 :           0 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    3552                 :           0 :         Oid         relid = rstate->relid;
    3553                 :             : 
    3554                 :             :         /* Only cleanup resources of tablesync workers */
    3555         [ #  # ]:           0 :         if (!OidIsValid(relid))
    3556                 :           0 :             continue;
    3557                 :             : 
    3558                 :             :         /*
    3559                 :             :          * Caller needs to ensure that relstate doesn't change underneath us.
    3560                 :             :          * See DropSubscription where we get the relstates.
    3561                 :             :          */
    3562         [ #  # ]:           0 :         if (rstate->state != SUBREL_STATE_SYNCDONE)
    3563                 :             :         {
    3564                 :           0 :             char        syncslotname[NAMEDATALEN] = {0};
    3565                 :             : 
    3566                 :           0 :             ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    3567                 :             :                                             sizeof(syncslotname));
    3568         [ #  # ]:           0 :             elog(WARNING, "could not drop tablesync replication slot \"%s\"",
    3569                 :             :                  syncslotname);
    3570                 :             :         }
    3571                 :             :     }
    3572                 :             : 
    3573         [ +  - ]:           4 :     ereport(ERROR,
    3574                 :             :             (errcode(ERRCODE_CONNECTION_FAILURE),
    3575                 :             :              errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
    3576                 :             :                     slotname, err),
    3577                 :             :     /* translator: %s is an SQL ALTER command */
    3578                 :             :              errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
    3579                 :             :                      "ALTER SUBSCRIPTION ... DISABLE",
    3580                 :             :                      "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
    3581                 :             : }
    3582                 :             : 
    3583                 :             : /*
    3584                 :             :  * Check for duplicates in the given list of publications and error out if
    3585                 :             :  * found one.  Add publications to datums as text datums, if datums is not
    3586                 :             :  * NULL.
    3587                 :             :  */
    3588                 :             : static void
    3589                 :         285 : check_duplicates_in_publist(List *publist, Datum *datums)
    3590                 :             : {
    3591                 :             :     ListCell   *cell;
    3592                 :         285 :     int         j = 0;
    3593                 :             : 
    3594   [ +  -  +  +  :         644 :     foreach(cell, publist)
                   +  + ]
    3595                 :             :     {
    3596                 :         371 :         char       *name = strVal(lfirst(cell));
    3597                 :             :         ListCell   *pcell;
    3598                 :             : 
    3599   [ +  -  +  -  :         534 :         foreach(pcell, publist)
                   +  - ]
    3600                 :             :         {
    3601                 :         534 :             char       *pname = strVal(lfirst(pcell));
    3602                 :             : 
    3603         [ +  + ]:         534 :             if (pcell == cell)
    3604                 :         359 :                 break;
    3605                 :             : 
    3606         [ +  + ]:         175 :             if (strcmp(name, pname) == 0)
    3607         [ +  - ]:          12 :                 ereport(ERROR,
    3608                 :             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    3609                 :             :                          errmsg("publication name \"%s\" used more than once",
    3610                 :             :                                 pname)));
    3611                 :             :         }
    3612                 :             : 
    3613         [ +  + ]:         359 :         if (datums)
    3614                 :         303 :             datums[j++] = CStringGetTextDatum(name);
    3615                 :             :     }
    3616                 :         273 : }
    3617                 :             : 
    3618                 :             : /*
    3619                 :             :  * Merge current subscription's publications and user-specified publications
    3620                 :             :  * from ADD/DROP PUBLICATIONS.
    3621                 :             :  *
    3622                 :             :  * If addpub is true, we will add the list of publications into oldpublist.
    3623                 :             :  * Otherwise, we will delete the list of publications from oldpublist.  The
    3624                 :             :  * returned list is a copy, oldpublist itself is not changed.
    3625                 :             :  *
    3626                 :             :  * subname is the subscription name, for error messages.
    3627                 :             :  */
    3628                 :             : static List *
    3629                 :          35 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
    3630                 :             : {
    3631                 :             :     ListCell   *lc;
    3632                 :             : 
    3633                 :          35 :     oldpublist = list_copy(oldpublist);
    3634                 :             : 
    3635                 :          35 :     check_duplicates_in_publist(newpublist, NULL);
    3636                 :             : 
    3637   [ +  -  +  +  :          59 :     foreach(lc, newpublist)
                   +  + ]
    3638                 :             :     {
    3639                 :          44 :         char       *name = strVal(lfirst(lc));
    3640                 :             :         ListCell   *lc2;
    3641                 :          44 :         bool        found = false;
    3642                 :             : 
    3643   [ +  -  +  +  :          86 :         foreach(lc2, oldpublist)
                   +  + ]
    3644                 :             :         {
    3645                 :          71 :             char       *pubname = strVal(lfirst(lc2));
    3646                 :             : 
    3647         [ +  + ]:          71 :             if (strcmp(name, pubname) == 0)
    3648                 :             :             {
    3649                 :          29 :                 found = true;
    3650         [ +  + ]:          29 :                 if (addpub)
    3651         [ +  - ]:           8 :                     ereport(ERROR,
    3652                 :             :                             (errcode(ERRCODE_DUPLICATE_OBJECT),
    3653                 :             :                              errmsg("publication \"%s\" is already in subscription \"%s\"",
    3654                 :             :                                     name, subname)));
    3655                 :             :                 else
    3656                 :          21 :                     oldpublist = foreach_delete_current(oldpublist, lc2);
    3657                 :             : 
    3658                 :          21 :                 break;
    3659                 :             :             }
    3660                 :             :         }
    3661                 :             : 
    3662   [ +  +  +  - ]:          36 :         if (addpub && !found)
    3663                 :          11 :             oldpublist = lappend(oldpublist, makeString(name));
    3664   [ +  -  +  + ]:          25 :         else if (!addpub && !found)
    3665         [ +  - ]:           4 :             ereport(ERROR,
    3666                 :             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3667                 :             :                      errmsg("publication \"%s\" is not in subscription \"%s\"",
    3668                 :             :                             name, subname)));
    3669                 :             :     }
    3670                 :             : 
    3671                 :             :     /*
    3672                 :             :      * XXX Probably no strong reason for this, but for now it's to make ALTER
    3673                 :             :      * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
    3674                 :             :      */
    3675         [ +  + ]:          15 :     if (!oldpublist)
    3676         [ +  - ]:           4 :         ereport(ERROR,
    3677                 :             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3678                 :             :                  errmsg("cannot drop all the publications from a subscription")));
    3679                 :             : 
    3680                 :          11 :     return oldpublist;
    3681                 :             : }
    3682                 :             : 
    3683                 :             : /*
    3684                 :             :  * Extract the streaming mode value from a DefElem.  This is like
    3685                 :             :  * defGetBoolean() but also accepts the special value of "parallel".
    3686                 :             :  */
    3687                 :             : char
    3688                 :         461 : defGetStreamingMode(DefElem *def)
    3689                 :             : {
    3690                 :             :     /*
    3691                 :             :      * If no parameter value given, assume "true" is meant.
    3692                 :             :      */
    3693         [ -  + ]:         461 :     if (!def->arg)
    3694                 :           0 :         return LOGICALREP_STREAM_ON;
    3695                 :             : 
    3696                 :             :     /*
    3697                 :             :      * Allow 0, 1, "false", "true", "off", "on" or "parallel".
    3698                 :             :      */
    3699         [ -  + ]:         461 :     switch (nodeTag(def->arg))
    3700                 :             :     {
    3701                 :           0 :         case T_Integer:
    3702      [ #  #  # ]:           0 :             switch (intVal(def->arg))
    3703                 :             :             {
    3704                 :           0 :                 case 0:
    3705                 :           0 :                     return LOGICALREP_STREAM_OFF;
    3706                 :           0 :                 case 1:
    3707                 :           0 :                     return LOGICALREP_STREAM_ON;
    3708                 :           0 :                 default:
    3709                 :             :                     /* otherwise, error out below */
    3710                 :           0 :                     break;
    3711                 :             :             }
    3712                 :           0 :             break;
    3713                 :         461 :         default:
    3714                 :             :             {
    3715                 :         461 :                 char       *sval = defGetString(def);
    3716                 :             : 
    3717                 :             :                 /*
    3718                 :             :                  * The set of strings accepted here should match up with the
    3719                 :             :                  * grammar's opt_boolean_or_string production.
    3720                 :             :                  */
    3721   [ +  +  +  + ]:         918 :                 if (pg_strcasecmp(sval, "false") == 0 ||
    3722                 :         457 :                     pg_strcasecmp(sval, "off") == 0)
    3723                 :           7 :                     return LOGICALREP_STREAM_OFF;
    3724   [ +  +  +  + ]:         896 :                 if (pg_strcasecmp(sval, "true") == 0 ||
    3725                 :         442 :                     pg_strcasecmp(sval, "on") == 0)
    3726                 :          47 :                     return LOGICALREP_STREAM_ON;
    3727         [ +  + ]:         407 :                 if (pg_strcasecmp(sval, "parallel") == 0)
    3728                 :         403 :                     return LOGICALREP_STREAM_PARALLEL;
    3729                 :             :             }
    3730                 :           4 :             break;
    3731                 :             :     }
    3732                 :             : 
    3733         [ +  - ]:           4 :     ereport(ERROR,
    3734                 :             :             (errcode(ERRCODE_SYNTAX_ERROR),
    3735                 :             :              errmsg("%s requires a Boolean value or \"parallel\"",
    3736                 :             :                     def->defname)));
    3737                 :             :     return LOGICALREP_STREAM_OFF;   /* keep compiler quiet */
    3738                 :             : }
        

Generated by: LCOV version 2.0-1