LCOV - code coverage report
Current view: top level - src/backend/commands - subscriptioncmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 376 411 91.5 %
Date: 2020-06-01 09:07:10 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * subscriptioncmds.c
       4             :  *      subscription catalog manipulation functions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      subscriptioncmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/htup_details.h"
      18             : #include "access/table.h"
      19             : #include "access/xact.h"
      20             : #include "catalog/catalog.h"
      21             : #include "catalog/dependency.h"
      22             : #include "catalog/indexing.h"
      23             : #include "catalog/namespace.h"
      24             : #include "catalog/objectaccess.h"
      25             : #include "catalog/objectaddress.h"
      26             : #include "catalog/pg_subscription.h"
      27             : #include "catalog/pg_subscription_rel.h"
      28             : #include "catalog/pg_type.h"
      29             : #include "commands/defrem.h"
      30             : #include "commands/event_trigger.h"
      31             : #include "commands/subscriptioncmds.h"
      32             : #include "executor/executor.h"
      33             : #include "miscadmin.h"
      34             : #include "nodes/makefuncs.h"
      35             : #include "replication/logicallauncher.h"
      36             : #include "replication/origin.h"
      37             : #include "replication/walreceiver.h"
      38             : #include "replication/walsender.h"
      39             : #include "replication/worker_internal.h"
      40             : #include "storage/lmgr.h"
      41             : #include "utils/acl.h"
      42             : #include "utils/builtins.h"
      43             : #include "utils/guc.h"
      44             : #include "utils/lsyscache.h"
      45             : #include "utils/memutils.h"
      46             : #include "utils/syscache.h"
      47             : 
      48             : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
      49             : 
      50             : /*
      51             :  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
      52             :  *
      53             :  * Since not all options can be specified in both commands, this function
      54             :  * will report an error on options if the target output pointer is NULL to
      55             :  * accommodate that.
      56             :  */
      57             : static void
      58         158 : parse_subscription_options(List *options, bool *connect, bool *enabled_given,
      59             :                            bool *enabled, bool *create_slot,
      60             :                            bool *slot_name_given, char **slot_name,
      61             :                            bool *copy_data, char **synchronous_commit,
      62             :                            bool *refresh)
      63             : {
      64             :     ListCell   *lc;
      65         158 :     bool        connect_given = false;
      66         158 :     bool        create_slot_given = false;
      67         158 :     bool        copy_data_given = false;
      68         158 :     bool        refresh_given = false;
      69             : 
      70             :     /* If connect is specified, the others also need to be. */
      71             :     Assert(!connect || (enabled && create_slot && copy_data));
      72             : 
      73         158 :     if (connect)
      74         108 :         *connect = true;
      75         158 :     if (enabled)
      76             :     {
      77         122 :         *enabled_given = false;
      78         122 :         *enabled = true;
      79             :     }
      80         158 :     if (create_slot)
      81         108 :         *create_slot = true;
      82         158 :     if (slot_name)
      83             :     {
      84         130 :         *slot_name_given = false;
      85         130 :         *slot_name = NULL;
      86             :     }
      87         158 :     if (copy_data)
      88         122 :         *copy_data = true;
      89         158 :     if (synchronous_commit)
      90         130 :         *synchronous_commit = NULL;
      91         158 :     if (refresh)
      92           6 :         *refresh = true;
      93             : 
      94             :     /* Parse options */
      95         302 :     foreach(lc, options)
      96             :     {
      97         152 :         DefElem    *defel = (DefElem *) lfirst(lc);
      98             : 
      99         152 :         if (strcmp(defel->defname, "connect") == 0 && connect)
     100             :         {
     101          40 :             if (connect_given)
     102           0 :                 ereport(ERROR,
     103             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     104             :                          errmsg("conflicting or redundant options")));
     105             : 
     106          40 :             connect_given = true;
     107          40 :             *connect = defGetBoolean(defel);
     108             :         }
     109         112 :         else if (strcmp(defel->defname, "enabled") == 0 && enabled)
     110             :         {
     111          26 :             if (*enabled_given)
     112           0 :                 ereport(ERROR,
     113             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     114             :                          errmsg("conflicting or redundant options")));
     115             : 
     116          26 :             *enabled_given = true;
     117          26 :             *enabled = defGetBoolean(defel);
     118             :         }
     119          86 :         else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
     120             :         {
     121          16 :             if (create_slot_given)
     122           0 :                 ereport(ERROR,
     123             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     124             :                          errmsg("conflicting or redundant options")));
     125             : 
     126          16 :             create_slot_given = true;
     127          16 :             *create_slot = defGetBoolean(defel);
     128             :         }
     129          70 :         else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
     130             :         {
     131          42 :             if (*slot_name_given)
     132           0 :                 ereport(ERROR,
     133             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     134             :                          errmsg("conflicting or redundant options")));
     135             : 
     136          42 :             *slot_name_given = true;
     137          42 :             *slot_name = defGetString(defel);
     138             : 
     139             :             /* Setting slot_name = NONE is treated as no slot name. */
     140          78 :             if (strcmp(*slot_name, "none") == 0)
     141          36 :                 *slot_name = NULL;
     142             :         }
     143          28 :         else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
     144             :         {
     145          12 :             if (copy_data_given)
     146           0 :                 ereport(ERROR,
     147             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     148             :                          errmsg("conflicting or redundant options")));
     149             : 
     150          12 :             copy_data_given = true;
     151          12 :             *copy_data = defGetBoolean(defel);
     152             :         }
     153          16 :         else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
     154             :                  synchronous_commit)
     155             :         {
     156           8 :             if (*synchronous_commit)
     157           0 :                 ereport(ERROR,
     158             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     159             :                          errmsg("conflicting or redundant options")));
     160             : 
     161           8 :             *synchronous_commit = defGetString(defel);
     162             : 
     163             :             /* Test if the given value is valid for synchronous_commit GUC. */
     164           8 :             (void) set_config_option("synchronous_commit", *synchronous_commit,
     165             :                                      PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     166             :                                      false, 0, false);
     167             :         }
     168           8 :         else if (strcmp(defel->defname, "refresh") == 0 && refresh)
     169             :         {
     170           4 :             if (refresh_given)
     171           0 :                 ereport(ERROR,
     172             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     173             :                          errmsg("conflicting or redundant options")));
     174             : 
     175           4 :             refresh_given = true;
     176           4 :             *refresh = defGetBoolean(defel);
     177             :         }
     178             :         else
     179           4 :             ereport(ERROR,
     180             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     181             :                      errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
     182             :     }
     183             : 
     184             :     /*
     185             :      * We've been explicitly asked to not connect, that requires some
     186             :      * additional processing.
     187             :      */
     188         150 :     if (connect && !*connect)
     189             :     {
     190             :         /* Check for incompatible options from the user. */
     191          40 :         if (enabled && *enabled_given && *enabled)
     192           4 :             ereport(ERROR,
     193             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     194             :             /*- translator: both %s are strings of the form "option = value" */
     195             :                      errmsg("%s and %s are mutually exclusive options",
     196             :                             "connect = false", "enabled = true")));
     197             : 
     198          36 :         if (create_slot && create_slot_given && *create_slot)
     199           4 :             ereport(ERROR,
     200             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     201             :                      errmsg("%s and %s are mutually exclusive options",
     202             :                             "connect = false", "create_slot = true")));
     203             : 
     204          32 :         if (copy_data && copy_data_given && *copy_data)
     205           4 :             ereport(ERROR,
     206             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     207             :                      errmsg("%s and %s are mutually exclusive options",
     208             :                             "connect = false", "copy_data = true")));
     209             : 
     210             :         /* Change the defaults of other options. */
     211          28 :         *enabled = false;
     212          28 :         *create_slot = false;
     213          28 :         *copy_data = false;
     214             :     }
     215             : 
     216             :     /*
     217             :      * Do additional checking for disallowed combination when slot_name = NONE
     218             :      * was used.
     219             :      */
     220         138 :     if (slot_name && *slot_name_given && !*slot_name)
     221             :     {
     222          36 :         if (enabled && *enabled_given && *enabled)
     223           4 :             ereport(ERROR,
     224             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     225             :             /*- translator: both %s are strings of the form "option = value" */
     226             :                      errmsg("%s and %s are mutually exclusive options",
     227             :                             "slot_name = NONE", "enabled = true")));
     228             : 
     229          32 :         if (create_slot && create_slot_given && *create_slot)
     230           4 :             ereport(ERROR,
     231             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     232             :                      errmsg("%s and %s are mutually exclusive options",
     233             :                             "slot_name = NONE", "create_slot = true")));
     234             : 
     235          28 :         if (enabled && !*enabled_given && *enabled)
     236           8 :             ereport(ERROR,
     237             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     238             :             /*- translator: both %s are strings of the form "option = value" */
     239             :                      errmsg("subscription with %s must also set %s",
     240             :                             "slot_name = NONE", "enabled = false")));
     241             : 
     242          20 :         if (create_slot && !create_slot_given && *create_slot)
     243           4 :             ereport(ERROR,
     244             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     245             :                      errmsg("subscription with %s must also set %s",
     246             :                             "slot_name = NONE", "create_slot = false")));
     247             :     }
     248         118 : }
     249             : 
     250             : /*
     251             :  * Auxiliary function to build a text array out of a list of String nodes.
     252             :  */
     253             : static Datum
     254          66 : publicationListToArray(List *publist)
     255             : {
     256             :     ArrayType  *arr;
     257             :     Datum      *datums;
     258          66 :     int         j = 0;
     259             :     ListCell   *cell;
     260             :     MemoryContext memcxt;
     261             :     MemoryContext oldcxt;
     262             : 
     263             :     /* Create memory context for temporary allocations. */
     264          66 :     memcxt = AllocSetContextCreate(CurrentMemoryContext,
     265             :                                    "publicationListToArray to array",
     266             :                                    ALLOCSET_DEFAULT_SIZES);
     267          66 :     oldcxt = MemoryContextSwitchTo(memcxt);
     268             : 
     269          66 :     datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
     270             : 
     271         142 :     foreach(cell, publist)
     272             :     {
     273          80 :         char       *name = strVal(lfirst(cell));
     274             :         ListCell   *pcell;
     275             : 
     276             :         /* Check for duplicates. */
     277          90 :         foreach(pcell, publist)
     278             :         {
     279          90 :             char       *pname = strVal(lfirst(pcell));
     280             : 
     281          90 :             if (pcell == cell)
     282          76 :                 break;
     283             : 
     284          14 :             if (strcmp(name, pname) == 0)
     285           4 :                 ereport(ERROR,
     286             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     287             :                          errmsg("publication name \"%s\" used more than once",
     288             :                                 pname)));
     289             :         }
     290             : 
     291          76 :         datums[j++] = CStringGetTextDatum(name);
     292             :     }
     293             : 
     294          62 :     MemoryContextSwitchTo(oldcxt);
     295             : 
     296          62 :     arr = construct_array(datums, list_length(publist),
     297             :                           TEXTOID, -1, false, TYPALIGN_INT);
     298             : 
     299          62 :     MemoryContextDelete(memcxt);
     300             : 
     301          62 :     return PointerGetDatum(arr);
     302             : }
     303             : 
     304             : /*
     305             :  * Create new subscription.
     306             :  */
     307             : ObjectAddress
     308         108 : CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
     309             : {
     310             :     Relation    rel;
     311             :     ObjectAddress myself;
     312             :     Oid         subid;
     313             :     bool        nulls[Natts_pg_subscription];
     314             :     Datum       values[Natts_pg_subscription];
     315         108 :     Oid         owner = GetUserId();
     316             :     HeapTuple   tup;
     317             :     bool        connect;
     318             :     bool        enabled_given;
     319             :     bool        enabled;
     320             :     bool        copy_data;
     321             :     char       *synchronous_commit;
     322             :     char       *conninfo;
     323             :     char       *slotname;
     324             :     bool        slotname_given;
     325             :     char        originname[NAMEDATALEN];
     326             :     bool        create_slot;
     327             :     List       *publications;
     328             : 
     329             :     /*
     330             :      * Parse and check options.
     331             :      *
     332             :      * Connection and publication should not be specified here.
     333             :      */
     334         108 :     parse_subscription_options(stmt->options, &connect, &enabled_given,
     335             :                                &enabled, &create_slot, &slotname_given,
     336             :                                &slotname, &copy_data, &synchronous_commit,
     337             :                                NULL);
     338             : 
     339             :     /*
     340             :      * Since creating a replication slot is not transactional, rolling back
     341             :      * the transaction leaves the created replication slot.  So we cannot run
     342             :      * CREATE SUBSCRIPTION inside a transaction block if creating a
     343             :      * replication slot.
     344             :      */
     345          76 :     if (create_slot)
     346          48 :         PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
     347             : 
     348          72 :     if (!superuser())
     349           4 :         ereport(ERROR,
     350             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     351             :                  errmsg("must be superuser to create subscriptions")));
     352             : 
     353             :     /*
     354             :      * If built with appropriate switch, whine when regression-testing
     355             :      * conventions for subscription names are violated.
     356             :      */
     357             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
     358             :     if (strncmp(stmt->subname, "regress_", 8) != 0)
     359             :         elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
     360             : #endif
     361             : 
     362          68 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     363             : 
     364             :     /* Check if name is used */
     365          68 :     subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
     366             :                             MyDatabaseId, CStringGetDatum(stmt->subname));
     367          68 :     if (OidIsValid(subid))
     368             :     {
     369           4 :         ereport(ERROR,
     370             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     371             :                  errmsg("subscription \"%s\" already exists",
     372             :                         stmt->subname)));
     373             :     }
     374             : 
     375          64 :     if (!slotname_given && slotname == NULL)
     376          52 :         slotname = stmt->subname;
     377             : 
     378             :     /* The default for synchronous_commit of subscriptions is off. */
     379          64 :     if (synchronous_commit == NULL)
     380          64 :         synchronous_commit = "off";
     381             : 
     382          64 :     conninfo = stmt->conninfo;
     383          64 :     publications = stmt->publication;
     384             : 
     385             :     /* Load the library providing us libpq calls. */
     386          64 :     load_file("libpqwalreceiver", false);
     387             : 
     388             :     /* Check the connection info string. */
     389          64 :     walrcv_check_conninfo(conninfo);
     390             : 
     391             :     /* Everything ok, form a new tuple. */
     392          60 :     memset(values, 0, sizeof(values));
     393          60 :     memset(nulls, false, sizeof(nulls));
     394             : 
     395          60 :     subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
     396             :                                Anum_pg_subscription_oid);
     397          60 :     values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
     398          60 :     values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
     399          60 :     values[Anum_pg_subscription_subname - 1] =
     400          60 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
     401          60 :     values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
     402          60 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
     403          60 :     values[Anum_pg_subscription_subconninfo - 1] =
     404          60 :         CStringGetTextDatum(conninfo);
     405          60 :     if (slotname)
     406          50 :         values[Anum_pg_subscription_subslotname - 1] =
     407          50 :             DirectFunctionCall1(namein, CStringGetDatum(slotname));
     408             :     else
     409          10 :         nulls[Anum_pg_subscription_subslotname - 1] = true;
     410          60 :     values[Anum_pg_subscription_subsynccommit - 1] =
     411          60 :         CStringGetTextDatum(synchronous_commit);
     412          56 :     values[Anum_pg_subscription_subpublications - 1] =
     413          60 :         publicationListToArray(publications);
     414             : 
     415          56 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     416             : 
     417             :     /* Insert tuple into catalog. */
     418          56 :     CatalogTupleInsert(rel, tup);
     419          56 :     heap_freetuple(tup);
     420             : 
     421          56 :     recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
     422             : 
     423          56 :     snprintf(originname, sizeof(originname), "pg_%u", subid);
     424          56 :     replorigin_create(originname);
     425             : 
     426             :     /*
     427             :      * Connect to remote side to execute requested commands and fetch table
     428             :      * info.
     429             :      */
     430          56 :     if (connect)
     431             :     {
     432             :         char       *err;
     433             :         WalReceiverConn *wrconn;
     434             :         List       *tables;
     435             :         ListCell   *lc;
     436             :         char        table_state;
     437             : 
     438             :         /* Try to connect to the publisher. */
     439          40 :         wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
     440          40 :         if (!wrconn)
     441           0 :             ereport(ERROR,
     442             :                     (errmsg("could not connect to the publisher: %s", err)));
     443             : 
     444          40 :         PG_TRY();
     445             :         {
     446             :             /*
     447             :              * Set sync state based on if we were asked to do data copy or
     448             :              * not.
     449             :              */
     450          40 :             table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
     451             : 
     452             :             /*
     453             :              * Get the table list from publisher and build local table status
     454             :              * info.
     455             :              */
     456          40 :             tables = fetch_table_list(wrconn, publications);
     457         134 :             foreach(lc, tables)
     458             :             {
     459          94 :                 RangeVar   *rv = (RangeVar *) lfirst(lc);
     460             :                 Oid         relid;
     461             : 
     462          94 :                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
     463             : 
     464             :                 /* Check for supported relkind. */
     465          94 :                 CheckSubscriptionRelkind(get_rel_relkind(relid),
     466          94 :                                          rv->schemaname, rv->relname);
     467             : 
     468          94 :                 AddSubscriptionRelState(subid, relid, table_state,
     469             :                                         InvalidXLogRecPtr);
     470             :             }
     471             : 
     472             :             /*
     473             :              * If requested, create permanent slot for the subscription. We
     474             :              * won't use the initial snapshot for anything, so no need to
     475             :              * export it.
     476             :              */
     477          40 :             if (create_slot)
     478             :             {
     479             :                 Assert(slotname);
     480             : 
     481          40 :                 walrcv_create_slot(wrconn, slotname, false,
     482             :                                    CRS_NOEXPORT_SNAPSHOT, NULL);
     483          40 :                 ereport(NOTICE,
     484             :                         (errmsg("created replication slot \"%s\" on publisher",
     485             :                                 slotname)));
     486             :             }
     487             :         }
     488           0 :         PG_FINALLY();
     489             :         {
     490          40 :             walrcv_disconnect(wrconn);
     491             :         }
     492          40 :         PG_END_TRY();
     493             :     }
     494             :     else
     495          16 :         ereport(WARNING,
     496             :         /* translator: %s is an SQL ALTER statement */
     497             :                 (errmsg("tables were not subscribed, you will have to run %s to subscribe the tables",
     498             :                         "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
     499             : 
     500          56 :     table_close(rel, RowExclusiveLock);
     501             : 
     502          56 :     if (enabled)
     503          40 :         ApplyLauncherWakeupAtCommit();
     504             : 
     505          56 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     506             : 
     507          56 :     InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
     508             : 
     509          56 :     return myself;
     510             : }
     511             : 
     512             : static void
     513          10 : AlterSubscription_refresh(Subscription *sub, bool copy_data)
     514             : {
     515             :     char       *err;
     516             :     List       *pubrel_names;
     517             :     List       *subrel_states;
     518             :     Oid        *subrel_local_oids;
     519             :     Oid        *pubrel_local_oids;
     520             :     ListCell   *lc;
     521             :     int         off;
     522             : 
     523             :     /* Load the library providing us libpq calls. */
     524          10 :     load_file("libpqwalreceiver", false);
     525             : 
     526             :     /* Try to connect to the publisher. */
     527          10 :     wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
     528          10 :     if (!wrconn)
     529           0 :         ereport(ERROR,
     530             :                 (errmsg("could not connect to the publisher: %s", err)));
     531             : 
     532             :     /* Get the table list from publisher. */
     533          10 :     pubrel_names = fetch_table_list(wrconn, sub->publications);
     534             : 
     535             :     /* We are done with the remote side, close connection. */
     536          10 :     walrcv_disconnect(wrconn);
     537             : 
     538             :     /* Get local table list. */
     539          10 :     subrel_states = GetSubscriptionRelations(sub->oid);
     540             : 
     541             :     /*
     542             :      * Build qsorted array of local table oids for faster lookup. This can
     543             :      * potentially contain all tables in the database so speed of lookup is
     544             :      * important.
     545             :      */
     546          10 :     subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
     547          10 :     off = 0;
     548          38 :     foreach(lc, subrel_states)
     549             :     {
     550          28 :         SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
     551             : 
     552          28 :         subrel_local_oids[off++] = relstate->relid;
     553             :     }
     554          10 :     qsort(subrel_local_oids, list_length(subrel_states),
     555             :           sizeof(Oid), oid_cmp);
     556             : 
     557             :     /*
     558             :      * Walk over the remote tables and try to match them to locally known
     559             :      * tables. If the table is not known locally create a new state for it.
     560             :      *
     561             :      * Also builds array of local oids of remote tables for the next step.
     562             :      */
     563          10 :     off = 0;
     564          10 :     pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
     565             : 
     566          30 :     foreach(lc, pubrel_names)
     567             :     {
     568          20 :         RangeVar   *rv = (RangeVar *) lfirst(lc);
     569             :         Oid         relid;
     570             : 
     571          20 :         relid = RangeVarGetRelid(rv, AccessShareLock, false);
     572             : 
     573             :         /* Check for supported relkind. */
     574          20 :         CheckSubscriptionRelkind(get_rel_relkind(relid),
     575          20 :                                  rv->schemaname, rv->relname);
     576             : 
     577          20 :         pubrel_local_oids[off++] = relid;
     578             : 
     579          20 :         if (!bsearch(&relid, subrel_local_oids,
     580          20 :                      list_length(subrel_states), sizeof(Oid), oid_cmp))
     581             :         {
     582          12 :             AddSubscriptionRelState(sub->oid, relid,
     583             :                                     copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
     584             :                                     InvalidXLogRecPtr);
     585          12 :             ereport(DEBUG1,
     586             :                     (errmsg("table \"%s.%s\" added to subscription \"%s\"",
     587             :                             rv->schemaname, rv->relname, sub->name)));
     588             :         }
     589             :     }
     590             : 
     591             :     /*
     592             :      * Next remove state for tables we should not care about anymore using the
     593             :      * data we collected above
     594             :      */
     595          10 :     qsort(pubrel_local_oids, list_length(pubrel_names),
     596             :           sizeof(Oid), oid_cmp);
     597             : 
     598          38 :     for (off = 0; off < list_length(subrel_states); off++)
     599             :     {
     600          28 :         Oid         relid = subrel_local_oids[off];
     601             : 
     602          28 :         if (!bsearch(&relid, pubrel_local_oids,
     603          28 :                      list_length(pubrel_names), sizeof(Oid), oid_cmp))
     604             :         {
     605          20 :             RemoveSubscriptionRel(sub->oid, relid);
     606             : 
     607          20 :             logicalrep_worker_stop_at_commit(sub->oid, relid);
     608             : 
     609          20 :             ereport(DEBUG1,
     610             :                     (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
     611             :                             get_namespace_name(get_rel_namespace(relid)),
     612             :                             get_rel_name(relid),
     613             :                             sub->name)));
     614             :         }
     615             :     }
     616          10 : }
     617             : 
     618             : /*
     619             :  * Alter the existing subscription.
     620             :  */
     621             : ObjectAddress
     622          68 : AlterSubscription(AlterSubscriptionStmt *stmt)
     623             : {
     624             :     Relation    rel;
     625             :     ObjectAddress myself;
     626             :     bool        nulls[Natts_pg_subscription];
     627             :     bool        replaces[Natts_pg_subscription];
     628             :     Datum       values[Natts_pg_subscription];
     629             :     HeapTuple   tup;
     630             :     Oid         subid;
     631          68 :     bool        update_tuple = false;
     632             :     Subscription *sub;
     633             :     Form_pg_subscription form;
     634             : 
     635          68 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     636             : 
     637             :     /* Fetch the existing tuple. */
     638          68 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
     639             :                               CStringGetDatum(stmt->subname));
     640             : 
     641          68 :     if (!HeapTupleIsValid(tup))
     642           4 :         ereport(ERROR,
     643             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     644             :                  errmsg("subscription \"%s\" does not exist",
     645             :                         stmt->subname)));
     646             : 
     647          64 :     form = (Form_pg_subscription) GETSTRUCT(tup);
     648          64 :     subid = form->oid;
     649             : 
     650             :     /* must be owner */
     651          64 :     if (!pg_subscription_ownercheck(subid, GetUserId()))
     652           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
     653           0 :                        stmt->subname);
     654             : 
     655          64 :     sub = GetSubscription(subid, false);
     656             : 
     657             :     /* Lock the subscription so nobody else can do anything with it. */
     658          64 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
     659             : 
     660             :     /* Form a new tuple. */
     661          64 :     memset(values, 0, sizeof(values));
     662          64 :     memset(nulls, false, sizeof(nulls));
     663          64 :     memset(replaces, false, sizeof(replaces));
     664             : 
     665          64 :     switch (stmt->kind)
     666             :     {
     667          22 :         case ALTER_SUBSCRIPTION_OPTIONS:
     668             :             {
     669             :                 char       *slotname;
     670             :                 bool        slotname_given;
     671             :                 char       *synchronous_commit;
     672             : 
     673          22 :                 parse_subscription_options(stmt->options, NULL, NULL, NULL,
     674             :                                            NULL, &slotname_given, &slotname,
     675             :                                            NULL, &synchronous_commit, NULL);
     676             : 
     677          14 :                 if (slotname_given)
     678             :                 {
     679          10 :                     if (sub->enabled && !slotname)
     680           0 :                         ereport(ERROR,
     681             :                                 (errcode(ERRCODE_SYNTAX_ERROR),
     682             :                                  errmsg("cannot set %s for enabled subscription",
     683             :                                         "slot_name = NONE")));
     684             : 
     685          10 :                     if (slotname)
     686           4 :                         values[Anum_pg_subscription_subslotname - 1] =
     687           4 :                             DirectFunctionCall1(namein, CStringGetDatum(slotname));
     688             :                     else
     689           6 :                         nulls[Anum_pg_subscription_subslotname - 1] = true;
     690          10 :                     replaces[Anum_pg_subscription_subslotname - 1] = true;
     691             :                 }
     692             : 
     693          14 :                 if (synchronous_commit)
     694             :                 {
     695           4 :                     values[Anum_pg_subscription_subsynccommit - 1] =
     696           4 :                         CStringGetTextDatum(synchronous_commit);
     697           4 :                     replaces[Anum_pg_subscription_subsynccommit - 1] = true;
     698             :                 }
     699             : 
     700          14 :                 update_tuple = true;
     701          14 :                 break;
     702             :             }
     703             : 
     704          14 :         case ALTER_SUBSCRIPTION_ENABLED:
     705             :             {
     706             :                 bool        enabled,
     707             :                             enabled_given;
     708             : 
     709          14 :                 parse_subscription_options(stmt->options, NULL,
     710             :                                            &enabled_given, &enabled, NULL,
     711             :                                            NULL, NULL, NULL, NULL, NULL);
     712             :                 Assert(enabled_given);
     713             : 
     714          14 :                 if (!sub->slotname && enabled)
     715           4 :                     ereport(ERROR,
     716             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     717             :                              errmsg("cannot enable subscription that does not have a slot name")));
     718             : 
     719          10 :                 values[Anum_pg_subscription_subenabled - 1] =
     720          10 :                     BoolGetDatum(enabled);
     721          10 :                 replaces[Anum_pg_subscription_subenabled - 1] = true;
     722             : 
     723          10 :                 if (enabled)
     724           4 :                     ApplyLauncherWakeupAtCommit();
     725             : 
     726          10 :                 update_tuple = true;
     727          10 :                 break;
     728             :             }
     729             : 
     730          10 :         case ALTER_SUBSCRIPTION_CONNECTION:
     731             :             /* Load the library providing us libpq calls. */
     732          10 :             load_file("libpqwalreceiver", false);
     733             :             /* Check the connection info string. */
     734          10 :             walrcv_check_conninfo(stmt->conninfo);
     735             : 
     736           6 :             values[Anum_pg_subscription_subconninfo - 1] =
     737           6 :                 CStringGetTextDatum(stmt->conninfo);
     738           6 :             replaces[Anum_pg_subscription_subconninfo - 1] = true;
     739           6 :             update_tuple = true;
     740           6 :             break;
     741             : 
     742           6 :         case ALTER_SUBSCRIPTION_PUBLICATION:
     743             :             {
     744             :                 bool        copy_data;
     745             :                 bool        refresh;
     746             : 
     747           6 :                 parse_subscription_options(stmt->options, NULL, NULL, NULL,
     748             :                                            NULL, NULL, NULL, &copy_data,
     749             :                                            NULL, &refresh);
     750             : 
     751           6 :                 values[Anum_pg_subscription_subpublications - 1] =
     752           6 :                     publicationListToArray(stmt->publication);
     753           6 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
     754             : 
     755           6 :                 update_tuple = true;
     756             : 
     757             :                 /* Refresh if user asked us to. */
     758           6 :                 if (refresh)
     759             :                 {
     760           2 :                     if (!sub->enabled)
     761           0 :                         ereport(ERROR,
     762             :                                 (errcode(ERRCODE_SYNTAX_ERROR),
     763             :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
     764             :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
     765             : 
     766             :                     /* Make sure refresh sees the new list of publications. */
     767           2 :                     sub->publications = stmt->publication;
     768             : 
     769           2 :                     AlterSubscription_refresh(sub, copy_data);
     770             :                 }
     771             : 
     772           6 :                 break;
     773             :             }
     774             : 
     775          12 :         case ALTER_SUBSCRIPTION_REFRESH:
     776             :             {
     777             :                 bool        copy_data;
     778             : 
     779          12 :                 if (!sub->enabled)
     780           4 :                     ereport(ERROR,
     781             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     782             :                              errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
     783             : 
     784           8 :                 parse_subscription_options(stmt->options, NULL, NULL, NULL,
     785             :                                            NULL, NULL, NULL, &copy_data,
     786             :                                            NULL, NULL);
     787             : 
     788           8 :                 AlterSubscription_refresh(sub, copy_data);
     789             : 
     790           8 :                 break;
     791             :             }
     792             : 
     793           0 :         default:
     794           0 :             elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
     795             :                  stmt->kind);
     796             :     }
     797             : 
     798             :     /* Update the catalog if needed. */
     799          44 :     if (update_tuple)
     800             :     {
     801          36 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     802             :                                 replaces);
     803             : 
     804          36 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
     805             : 
     806          36 :         heap_freetuple(tup);
     807             :     }
     808             : 
     809          44 :     table_close(rel, RowExclusiveLock);
     810             : 
     811          44 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     812             : 
     813          44 :     InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
     814             : 
     815          44 :     return myself;
     816             : }
     817             : 
     818             : /*
     819             :  * Drop a subscription
     820             :  */
     821             : void
     822          42 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
     823             : {
     824             :     Relation    rel;
     825             :     ObjectAddress myself;
     826             :     HeapTuple   tup;
     827             :     Oid         subid;
     828             :     Datum       datum;
     829             :     bool        isnull;
     830             :     char       *subname;
     831             :     char       *conninfo;
     832             :     char       *slotname;
     833             :     List       *subworkers;
     834             :     ListCell   *lc;
     835             :     char        originname[NAMEDATALEN];
     836          42 :     char       *err = NULL;
     837             :     RepOriginId originid;
     838          42 :     WalReceiverConn *wrconn = NULL;
     839             :     StringInfoData cmd;
     840             :     Form_pg_subscription form;
     841             : 
     842             :     /*
     843             :      * Lock pg_subscription with AccessExclusiveLock to ensure that the
     844             :      * launcher doesn't restart new worker during dropping the subscription
     845             :      */
     846          42 :     rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
     847             : 
     848          84 :     tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
     849          42 :                           CStringGetDatum(stmt->subname));
     850             : 
     851          42 :     if (!HeapTupleIsValid(tup))
     852             :     {
     853           8 :         table_close(rel, NoLock);
     854             : 
     855           8 :         if (!stmt->missing_ok)
     856           4 :             ereport(ERROR,
     857             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     858             :                      errmsg("subscription \"%s\" does not exist",
     859             :                             stmt->subname)));
     860             :         else
     861           4 :             ereport(NOTICE,
     862             :                     (errmsg("subscription \"%s\" does not exist, skipping",
     863             :                             stmt->subname)));
     864             : 
     865          20 :         return;
     866             :     }
     867             : 
     868          34 :     form = (Form_pg_subscription) GETSTRUCT(tup);
     869          34 :     subid = form->oid;
     870             : 
     871             :     /* must be owner */
     872          34 :     if (!pg_subscription_ownercheck(subid, GetUserId()))
     873           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
     874           0 :                        stmt->subname);
     875             : 
     876             :     /* DROP hook for the subscription being removed */
     877          34 :     InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
     878             : 
     879             :     /*
     880             :      * Lock the subscription so nobody else can do anything with it (including
     881             :      * the replication workers).
     882             :      */
     883          34 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
     884             : 
     885             :     /* Get subname */
     886          34 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
     887             :                             Anum_pg_subscription_subname, &isnull);
     888             :     Assert(!isnull);
     889          34 :     subname = pstrdup(NameStr(*DatumGetName(datum)));
     890             : 
     891             :     /* Get conninfo */
     892          34 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
     893             :                             Anum_pg_subscription_subconninfo, &isnull);
     894             :     Assert(!isnull);
     895          34 :     conninfo = TextDatumGetCString(datum);
     896             : 
     897             :     /* Get slotname */
     898          34 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
     899             :                             Anum_pg_subscription_subslotname, &isnull);
     900          34 :     if (!isnull)
     901          18 :         slotname = pstrdup(NameStr(*DatumGetName(datum)));
     902             :     else
     903          16 :         slotname = NULL;
     904             : 
     905             :     /*
     906             :      * Since dropping a replication slot is not transactional, the replication
     907             :      * slot stays dropped even if the transaction rolls back.  So we cannot
     908             :      * run DROP SUBSCRIPTION inside a transaction block if dropping the
     909             :      * replication slot.
     910             :      *
     911             :      * XXX The command name should really be something like "DROP SUBSCRIPTION
     912             :      * of a subscription that is associated with a replication slot", but we
     913             :      * don't have the proper facilities for that.
     914             :      */
     915          34 :     if (slotname)
     916          18 :         PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
     917             : 
     918          30 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     919          30 :     EventTriggerSQLDropAddObject(&myself, true, true);
     920             : 
     921             :     /* Remove the tuple from catalog. */
     922          30 :     CatalogTupleDelete(rel, &tup->t_self);
     923             : 
     924          30 :     ReleaseSysCache(tup);
     925             : 
     926             :     /*
     927             :      * Stop all the subscription workers immediately.
     928             :      *
     929             :      * This is necessary if we are dropping the replication slot, so that the
     930             :      * slot becomes accessible.
     931             :      *
     932             :      * It is also necessary if the subscription is disabled and was disabled
     933             :      * in the same transaction.  Then the workers haven't seen the disabling
     934             :      * yet and will still be running, leading to hangs later when we want to
     935             :      * drop the replication origin.  If the subscription was disabled before
     936             :      * this transaction, then there shouldn't be any workers left, so this
     937             :      * won't make a difference.
     938             :      *
     939             :      * New workers won't be started because we hold an exclusive lock on the
     940             :      * subscription till the end of the transaction.
     941             :      */
     942          30 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     943          30 :     subworkers = logicalrep_workers_find(subid, false);
     944          30 :     LWLockRelease(LogicalRepWorkerLock);
     945          46 :     foreach(lc, subworkers)
     946             :     {
     947          16 :         LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     948             : 
     949          16 :         logicalrep_worker_stop(w->subid, w->relid);
     950             :     }
     951          30 :     list_free(subworkers);
     952             : 
     953             :     /* Clean up dependencies */
     954          30 :     deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
     955             : 
     956             :     /* Remove any associated relation synchronization states. */
     957          30 :     RemoveSubscriptionRel(subid, InvalidOid);
     958             : 
     959             :     /* Remove the origin tracking if exists. */
     960          30 :     snprintf(originname, sizeof(originname), "pg_%u", subid);
     961          30 :     originid = replorigin_by_name(originname, true);
     962          30 :     if (originid != InvalidRepOriginId)
     963          30 :         replorigin_drop(originid, false);
     964             : 
     965             :     /*
     966             :      * If there is no slot associated with the subscription, we can finish
     967             :      * here.
     968             :      */
     969          30 :     if (!slotname)
     970             :     {
     971          16 :         table_close(rel, NoLock);
     972          16 :         return;
     973             :     }
     974             : 
     975             :     /*
     976             :      * Otherwise drop the replication slot at the publisher node using the
     977             :      * replication connection.
     978             :      */
     979          14 :     load_file("libpqwalreceiver", false);
     980             : 
     981          14 :     initStringInfo(&cmd);
     982          14 :     appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
     983             : 
     984          14 :     wrconn = walrcv_connect(conninfo, true, subname, &err);
     985          14 :     if (wrconn == NULL)
     986           0 :         ereport(ERROR,
     987             :                 (errmsg("could not connect to publisher when attempting to "
     988             :                         "drop the replication slot \"%s\"", slotname),
     989             :                  errdetail("The error was: %s", err),
     990             :         /* translator: %s is an SQL ALTER command */
     991             :                  errhint("Use %s to disassociate the subscription from the slot.",
     992             :                          "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
     993             : 
     994          14 :     PG_TRY();
     995             :     {
     996             :         WalRcvExecResult *res;
     997             : 
     998          14 :         res = walrcv_exec(wrconn, cmd.data, 0, NULL);
     999             : 
    1000          14 :         if (res->status != WALRCV_OK_COMMAND)
    1001           0 :             ereport(ERROR,
    1002             :                     (errmsg("could not drop the replication slot \"%s\" on publisher",
    1003             :                             slotname),
    1004             :                      errdetail("The error was: %s", res->err)));
    1005             :         else
    1006          14 :             ereport(NOTICE,
    1007             :                     (errmsg("dropped replication slot \"%s\" on publisher",
    1008             :                             slotname)));
    1009             : 
    1010          14 :         walrcv_clear_result(res);
    1011             :     }
    1012           0 :     PG_FINALLY();
    1013             :     {
    1014          14 :         walrcv_disconnect(wrconn);
    1015             :     }
    1016          14 :     PG_END_TRY();
    1017             : 
    1018          14 :     pfree(cmd.data);
    1019             : 
    1020          14 :     table_close(rel, NoLock);
    1021             : }
    1022             : 
    1023             : /*
    1024             :  * Internal workhorse for changing a subscription owner
    1025             :  */
    1026             : static void
    1027           8 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    1028             : {
    1029             :     Form_pg_subscription form;
    1030             : 
    1031           8 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1032             : 
    1033           8 :     if (form->subowner == newOwnerId)
    1034           0 :         return;
    1035             : 
    1036           8 :     if (!pg_subscription_ownercheck(form->oid, GetUserId()))
    1037           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    1038           0 :                        NameStr(form->subname));
    1039             : 
    1040             :     /* New owner must be a superuser */
    1041           8 :     if (!superuser_arg(newOwnerId))
    1042           4 :         ereport(ERROR,
    1043             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1044             :                  errmsg("permission denied to change owner of subscription \"%s\"",
    1045             :                         NameStr(form->subname)),
    1046             :                  errhint("The owner of a subscription must be a superuser.")));
    1047             : 
    1048           4 :     form->subowner = newOwnerId;
    1049           4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1050             : 
    1051             :     /* Update owner dependency reference */
    1052           4 :     changeDependencyOnOwner(SubscriptionRelationId,
    1053             :                             form->oid,
    1054             :                             newOwnerId);
    1055             : 
    1056           4 :     InvokeObjectPostAlterHook(SubscriptionRelationId,
    1057             :                               form->oid, 0);
    1058             : }
    1059             : 
    1060             : /*
    1061             :  * Change subscription owner -- by name
    1062             :  */
    1063             : ObjectAddress
    1064           8 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
    1065             : {
    1066             :     Oid         subid;
    1067             :     HeapTuple   tup;
    1068             :     Relation    rel;
    1069             :     ObjectAddress address;
    1070             :     Form_pg_subscription form;
    1071             : 
    1072           8 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1073             : 
    1074           8 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
    1075             :                               CStringGetDatum(name));
    1076             : 
    1077           8 :     if (!HeapTupleIsValid(tup))
    1078           0 :         ereport(ERROR,
    1079             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1080             :                  errmsg("subscription \"%s\" does not exist", name)));
    1081             : 
    1082           8 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1083           8 :     subid = form->oid;
    1084             : 
    1085           8 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    1086             : 
    1087           4 :     ObjectAddressSet(address, SubscriptionRelationId, subid);
    1088             : 
    1089           4 :     heap_freetuple(tup);
    1090             : 
    1091           4 :     table_close(rel, RowExclusiveLock);
    1092             : 
    1093           4 :     return address;
    1094             : }
    1095             : 
    1096             : /*
    1097             :  * Change subscription owner -- by OID
    1098             :  */
    1099             : void
    1100           0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
    1101             : {
    1102             :     HeapTuple   tup;
    1103             :     Relation    rel;
    1104             : 
    1105           0 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1106             : 
    1107           0 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
    1108             : 
    1109           0 :     if (!HeapTupleIsValid(tup))
    1110           0 :         ereport(ERROR,
    1111             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1112             :                  errmsg("subscription with OID %u does not exist", subid)));
    1113             : 
    1114           0 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    1115             : 
    1116           0 :     heap_freetuple(tup);
    1117             : 
    1118           0 :     table_close(rel, RowExclusiveLock);
    1119           0 : }
    1120             : 
    1121             : /*
    1122             :  * Get the list of tables which belong to specified publications on the
    1123             :  * publisher connection.
    1124             :  */
    1125             : static List *
    1126          50 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
    1127             : {
    1128             :     WalRcvExecResult *res;
    1129             :     StringInfoData cmd;
    1130             :     TupleTableSlot *slot;
    1131          50 :     Oid         tableRow[2] = {TEXTOID, TEXTOID};
    1132             :     ListCell   *lc;
    1133             :     bool        first;
    1134          50 :     List       *tablelist = NIL;
    1135             : 
    1136             :     Assert(list_length(publications) > 0);
    1137             : 
    1138          50 :     initStringInfo(&cmd);
    1139          50 :     appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
    1140             :                            "  FROM pg_catalog.pg_publication_tables t\n"
    1141             :                            " WHERE t.pubname IN (");
    1142          50 :     first = true;
    1143         102 :     foreach(lc, publications)
    1144             :     {
    1145          52 :         char       *pubname = strVal(lfirst(lc));
    1146             : 
    1147          52 :         if (first)
    1148          50 :             first = false;
    1149             :         else
    1150           2 :             appendStringInfoString(&cmd, ", ");
    1151             : 
    1152          52 :         appendStringInfoString(&cmd, quote_literal_cstr(pubname));
    1153             :     }
    1154          50 :     appendStringInfoChar(&cmd, ')');
    1155             : 
    1156          50 :     res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
    1157          50 :     pfree(cmd.data);
    1158             : 
    1159          50 :     if (res->status != WALRCV_OK_TUPLES)
    1160           0 :         ereport(ERROR,
    1161             :                 (errmsg("could not receive list of replicated tables from the publisher: %s",
    1162             :                         res->err)));
    1163             : 
    1164             :     /* Process tables. */
    1165          50 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    1166         164 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    1167             :     {
    1168             :         char       *nspname;
    1169             :         char       *relname;
    1170             :         bool        isnull;
    1171             :         RangeVar   *rv;
    1172             : 
    1173         114 :         nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    1174             :         Assert(!isnull);
    1175         114 :         relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
    1176             :         Assert(!isnull);
    1177             : 
    1178         114 :         rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
    1179         114 :         tablelist = lappend(tablelist, rv);
    1180             : 
    1181         114 :         ExecClearTuple(slot);
    1182             :     }
    1183          50 :     ExecDropSingleTupleTableSlot(slot);
    1184             : 
    1185          50 :     walrcv_clear_result(res);
    1186             : 
    1187          50 :     return tablelist;
    1188             : }

Generated by: LCOV version 1.13