LCOV - code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 92.0 % 485 446
Test Date: 2026-03-12 02:14:33 Functions: 100.0 % 16 16
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * tablesync.c
       3              :  *    PostgreSQL logical replication: initial table data synchronization
       4              :  *
       5              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       6              :  *
       7              :  * IDENTIFICATION
       8              :  *    src/backend/replication/logical/tablesync.c
       9              :  *
      10              :  * NOTES
      11              :  *    This file contains code for initial table data synchronization for
      12              :  *    logical replication.
      13              :  *
      14              :  *    The initial data synchronization is done separately for each table,
      15              :  *    in a separate apply worker that only fetches the initial snapshot data
      16              :  *    from the publisher and then synchronizes the position in the stream with
      17              :  *    the leader apply worker.
      18              :  *
      19              :  *    There are several reasons for doing the synchronization this way:
      20              :  *     - It allows us to parallelize the initial data synchronization
      21              :  *       which lowers the time needed for it to happen.
      22              :  *     - The initial synchronization does not have to hold the xid and LSN
      23              :  *       for the time it takes to copy data of all tables, causing less
      24              :  *       bloat and lower disk consumption compared to doing the
      25              :  *       synchronization in a single process for the whole database.
      26              :  *     - It allows us to synchronize any tables added after the initial
      27              :  *       synchronization has finished.
      28              :  *
      29              :  *    The stream position synchronization works in multiple steps:
      30              :  *     - Apply worker requests a tablesync worker to start, setting the new
      31              :  *       table state to INIT.
      32              :  *     - Tablesync worker starts; changes table state from INIT to DATASYNC while
      33              :  *       copying.
      34              :  *     - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
      35              :  *       worker specific) state to indicate when the copy phase has completed, so
      36              :  *       if the worker crashes with this (non-memory) state then the copy will not
      37              :  *       be re-attempted.
      38              :  *     - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
      39              :  *     - Apply worker periodically checks for tables in SYNCWAIT state.  When
      40              :  *       any appear, it sets the table state to CATCHUP and starts loop-waiting
      41              :  *       until either the table state is set to SYNCDONE or the sync worker
      42              :  *       exits.
      43              :  *     - After the sync worker has seen the state change to CATCHUP, it will
      44              :  *       read the stream and apply changes (acting like an apply worker) until
      45              :  *       it catches up to the specified stream position.  Then it sets the
      46              :  *       state to SYNCDONE.  There might be zero changes applied between
      47              :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
      48              :  *       apply worker.
      49              :  *     - Once the state is set to SYNCDONE, the apply will continue tracking
      50              :  *       the table until it reaches the SYNCDONE stream position, at which
      51              :  *       point it sets state to READY and stops tracking.  Again, there might
      52              :  *       be zero changes in between.
      53              :  *
      54              :  *    So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
      55              :  *    -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
      56              :  *
      57              :  *    The catalog pg_subscription_rel is used to keep information about
      58              :  *    subscribed tables and their state.  The catalog holds all states
      59              :  *    except SYNCWAIT and CATCHUP which are only in shared memory.
      60              :  *
      61              :  *    Example flows look like this:
      62              :  *     - Apply is in front:
      63              :  *        sync:8
      64              :  *          -> set in catalog FINISHEDCOPY
      65              :  *          -> set in memory SYNCWAIT
      66              :  *        apply:10
      67              :  *          -> set in memory CATCHUP
      68              :  *          -> enter wait-loop
      69              :  *        sync:10
      70              :  *          -> set in catalog SYNCDONE
      71              :  *          -> exit
      72              :  *        apply:10
      73              :  *          -> exit wait-loop
      74              :  *          -> continue rep
      75              :  *        apply:11
      76              :  *          -> set in catalog READY
      77              :  *
      78              :  *     - Sync is in front:
      79              :  *        sync:10
      80              :  *          -> set in catalog FINISHEDCOPY
      81              :  *          -> set in memory SYNCWAIT
      82              :  *        apply:8
      83              :  *          -> set in memory CATCHUP
      84              :  *          -> continue per-table filtering
      85              :  *        sync:10
      86              :  *          -> set in catalog SYNCDONE
      87              :  *          -> exit
      88              :  *        apply:10
      89              :  *          -> set in catalog READY
      90              :  *          -> stop per-table filtering
      91              :  *          -> continue rep
      92              :  *-------------------------------------------------------------------------
      93              :  */
      94              : 
      95              : #include "postgres.h"
      96              : 
      97              : #include "access/table.h"
      98              : #include "access/xact.h"
      99              : #include "catalog/indexing.h"
     100              : #include "catalog/pg_subscription_rel.h"
     101              : #include "catalog/pg_type.h"
     102              : #include "commands/copy.h"
     103              : #include "miscadmin.h"
     104              : #include "nodes/makefuncs.h"
     105              : #include "parser/parse_relation.h"
     106              : #include "pgstat.h"
     107              : #include "replication/logicallauncher.h"
     108              : #include "replication/logicalrelation.h"
     109              : #include "replication/logicalworker.h"
     110              : #include "replication/origin.h"
     111              : #include "replication/slot.h"
     112              : #include "replication/walreceiver.h"
     113              : #include "replication/worker_internal.h"
     114              : #include "storage/ipc.h"
     115              : #include "storage/latch.h"
     116              : #include "storage/lmgr.h"
     117              : #include "utils/acl.h"
     118              : #include "utils/array.h"
     119              : #include "utils/builtins.h"
     120              : #include "utils/lsyscache.h"
     121              : #include "utils/rls.h"
     122              : #include "utils/snapmgr.h"
     123              : #include "utils/syscache.h"
     124              : #include "utils/usercontext.h"
     125              : #include "utils/wait_event.h"
     126              : 
     127              : List       *table_states_not_ready = NIL;
     128              : 
     129              : static StringInfo copybuf = NULL;
     130              : 
     131              : /*
     132              :  * Wait until the relation sync state is set in the catalog to the expected
     133              :  * one; return true when it happens.
     134              :  *
     135              :  * Returns false if the table sync worker or the table itself have
     136              :  * disappeared, or the table state has been reset.
     137              :  *
     138              :  * Currently, this is used in the apply worker when transitioning from
     139              :  * CATCHUP state to SYNCDONE.
     140              :  */
     141              : static bool
     142          192 : wait_for_table_state_change(Oid relid, char expected_state)
     143              : {
     144              :     char        state;
     145              : 
     146              :     for (;;)
     147          223 :     {
     148              :         LogicalRepWorker *worker;
     149              :         XLogRecPtr  statelsn;
     150              : 
     151          415 :         CHECK_FOR_INTERRUPTS();
     152              : 
     153          415 :         InvalidateCatalogSnapshot();
     154          415 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     155              :                                         relid, &statelsn);
     156              : 
     157          415 :         if (state == SUBREL_STATE_UNKNOWN)
     158            0 :             break;
     159              : 
     160          415 :         if (state == expected_state)
     161          192 :             return true;
     162              : 
     163              :         /* Check if the sync worker is still running and bail if not. */
     164          223 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     165          223 :         worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
     166          223 :                                         MyLogicalRepWorker->subid, relid,
     167              :                                         false);
     168          223 :         LWLockRelease(LogicalRepWorkerLock);
     169          223 :         if (!worker)
     170            0 :             break;
     171              : 
     172          223 :         (void) WaitLatch(MyLatch,
     173              :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     174              :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     175              : 
     176          223 :         ResetLatch(MyLatch);
     177              :     }
     178              : 
     179            0 :     return false;
     180              : }
     181              : 
     182              : /*
     183              :  * Wait until the apply worker changes the state of our synchronization
     184              :  * worker to the expected one.
     185              :  *
     186              :  * Used when transitioning from SYNCWAIT state to CATCHUP.
     187              :  *
     188              :  * Returns false if the apply worker has disappeared.
     189              :  */
     190              : static bool
     191          194 : wait_for_worker_state_change(char expected_state)
     192              : {
     193              :     int         rc;
     194              : 
     195              :     for (;;)
     196          194 :     {
     197              :         LogicalRepWorker *worker;
     198              : 
     199          388 :         CHECK_FOR_INTERRUPTS();
     200              : 
     201              :         /*
     202              :          * Done if already in correct state.  (We assume this fetch is atomic
     203              :          * enough to not give a misleading answer if we do it with no lock.)
     204              :          */
     205          388 :         if (MyLogicalRepWorker->relstate == expected_state)
     206          194 :             return true;
     207              : 
     208              :         /*
     209              :          * Bail out if the apply worker has died, else signal it we're
     210              :          * waiting.
     211              :          */
     212          194 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     213          194 :         worker = logicalrep_worker_find(WORKERTYPE_APPLY,
     214          194 :                                         MyLogicalRepWorker->subid, InvalidOid,
     215              :                                         false);
     216          194 :         if (worker && worker->proc)
     217          194 :             logicalrep_worker_wakeup_ptr(worker);
     218          194 :         LWLockRelease(LogicalRepWorkerLock);
     219          194 :         if (!worker)
     220            0 :             break;
     221              : 
     222              :         /*
     223              :          * Wait.  We expect to get a latch signal back from the apply worker,
     224              :          * but use a timeout in case it dies without sending one.
     225              :          */
     226          194 :         rc = WaitLatch(MyLatch,
     227              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     228              :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     229              : 
     230          194 :         if (rc & WL_LATCH_SET)
     231          194 :             ResetLatch(MyLatch);
     232              :     }
     233              : 
     234            0 :     return false;
     235              : }
     236              : 
     237              : /*
     238              :  * Handle table synchronization cooperation from the synchronization
     239              :  * worker.
     240              :  *
     241              :  * If the sync worker is in CATCHUP state and reached (or passed) the
     242              :  * predetermined synchronization point in the WAL stream, mark the table as
     243              :  * SYNCDONE and finish.
     244              :  */
     245              : void
     246          228 : ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
     247              : {
     248          228 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     249              : 
     250          228 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
     251          228 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
     252              :     {
     253              :         TimeLineID  tli;
     254          194 :         char        syncslotname[NAMEDATALEN] = {0};
     255          194 :         char        originname[NAMEDATALEN] = {0};
     256              : 
     257          194 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
     258          194 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
     259              : 
     260          194 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     261              : 
     262              :         /*
     263              :          * UpdateSubscriptionRelState must be called within a transaction.
     264              :          */
     265          194 :         if (!IsTransactionState())
     266          194 :             StartTransactionCommand();
     267              : 
     268          194 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     269          194 :                                    MyLogicalRepWorker->relid,
     270          194 :                                    MyLogicalRepWorker->relstate,
     271          194 :                                    MyLogicalRepWorker->relstate_lsn,
     272              :                                    false);
     273              : 
     274              :         /*
     275              :          * End streaming so that LogRepWorkerWalRcvConn can be used to drop
     276              :          * the slot.
     277              :          */
     278          194 :         walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
     279              : 
     280              :         /*
     281              :          * Cleanup the tablesync slot.
     282              :          *
     283              :          * This has to be done after updating the state because otherwise if
     284              :          * there is an error while doing the database operations we won't be
     285              :          * able to rollback dropped slot.
     286              :          */
     287          194 :         ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
     288          194 :                                         MyLogicalRepWorker->relid,
     289              :                                         syncslotname,
     290              :                                         sizeof(syncslotname));
     291              : 
     292              :         /*
     293              :          * It is important to give an error if we are unable to drop the slot,
     294              :          * otherwise, it won't be dropped till the corresponding subscription
     295              :          * is dropped. So passing missing_ok = false.
     296              :          */
     297          194 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
     298              : 
     299          194 :         CommitTransactionCommand();
     300          194 :         pgstat_report_stat(false);
     301              : 
     302              :         /*
     303              :          * Start a new transaction to clean up the tablesync origin tracking.
     304              :          * This transaction will be ended within the FinishSyncWorker(). Now,
     305              :          * even, if we fail to remove this here, the apply worker will ensure
     306              :          * to clean it up afterward.
     307              :          *
     308              :          * We need to do this after the table state is set to SYNCDONE.
     309              :          * Otherwise, if an error occurs while performing the database
     310              :          * operation, the worker will be restarted and the in-memory state of
     311              :          * replication progress (remote_lsn) won't be rolled-back which would
     312              :          * have been cleared before restart. So, the restarted worker will use
     313              :          * invalid replication progress state resulting in replay of
     314              :          * transactions that have already been applied.
     315              :          */
     316          194 :         StartTransactionCommand();
     317              : 
     318          194 :         ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
     319          194 :                                            MyLogicalRepWorker->relid,
     320              :                                            originname,
     321              :                                            sizeof(originname));
     322              : 
     323              :         /*
     324              :          * Resetting the origin session removes the ownership of the slot.
     325              :          * This is needed to allow the origin to be dropped.
     326              :          */
     327          194 :         replorigin_session_reset();
     328          194 :         replorigin_xact_clear(true);
     329              : 
     330              :         /*
     331              :          * Drop the tablesync's origin tracking if exists.
     332              :          *
     333              :          * There is a chance that the user is concurrently performing refresh
     334              :          * for the subscription where we remove the table state and its origin
     335              :          * or the apply worker would have removed this origin. So passing
     336              :          * missing_ok = true.
     337              :          */
     338          194 :         replorigin_drop_by_name(originname, true, false);
     339              : 
     340          194 :         FinishSyncWorker();
     341              :     }
     342              :     else
     343           34 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     344           34 : }
     345              : 
     346              : /*
     347              :  * Handle table synchronization cooperation from the apply worker.
     348              :  *
     349              :  * Walk over all subscription tables that are individually tracked by the
     350              :  * apply process (currently, all that have state other than
     351              :  * SUBREL_STATE_READY) and manage synchronization for them.
     352              :  *
     353              :  * If there are tables that need synchronizing and are not being synchronized
     354              :  * yet, start sync workers for them (if there are free slots for sync
     355              :  * workers).  To prevent starting the sync worker for the same relation at a
     356              :  * high frequency after a failure, we store its last start time with each sync
     357              :  * state info.  We start the sync worker for the same relation after waiting
     358              :  * at least wal_retrieve_retry_interval.
     359              :  *
     360              :  * For tables that are being synchronized already, check if sync workers
     361              :  * either need action from the apply worker or have finished.  This is the
     362              :  * SYNCWAIT to CATCHUP transition.
     363              :  *
     364              :  * If the synchronization position is reached (SYNCDONE), then the table can
     365              :  * be marked as READY and is no longer tracked.
     366              :  */
     367              : void
     368         4336 : ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
     369              : {
     370              :     struct tablesync_start_time_mapping
     371              :     {
     372              :         Oid         relid;
     373              :         TimestampTz last_start_time;
     374              :     };
     375              :     static HTAB *last_start_times = NULL;
     376              :     ListCell   *lc;
     377              :     bool        started_tx;
     378         4336 :     bool        should_exit = false;
     379         4336 :     Relation    rel = NULL;
     380              : 
     381              :     Assert(!IsTransactionState());
     382              : 
     383              :     /* We need up-to-date sync state info for subscription tables here. */
     384         4336 :     FetchRelationStates(NULL, NULL, &started_tx);
     385              : 
     386              :     /*
     387              :      * Prepare a hash table for tracking last start times of workers, to avoid
     388              :      * immediate restarts.  We don't need it if there are no tables that need
     389              :      * syncing.
     390              :      */
     391         4336 :     if (table_states_not_ready != NIL && !last_start_times)
     392          126 :     {
     393              :         HASHCTL     ctl;
     394              : 
     395          126 :         ctl.keysize = sizeof(Oid);
     396          126 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
     397          126 :         last_start_times = hash_create("Logical replication table sync worker start times",
     398              :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
     399              :     }
     400              : 
     401              :     /*
     402              :      * Clean up the hash table when we're done with all tables (just to
     403              :      * release the bit of memory).
     404              :      */
     405         4210 :     else if (table_states_not_ready == NIL && last_start_times)
     406              :     {
     407           98 :         hash_destroy(last_start_times);
     408           98 :         last_start_times = NULL;
     409              :     }
     410              : 
     411              :     /*
     412              :      * Process all tables that are being synchronized.
     413              :      */
     414         6101 :     foreach(lc, table_states_not_ready)
     415              :     {
     416         1765 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
     417              : 
     418         1765 :         if (!started_tx)
     419              :         {
     420          325 :             StartTransactionCommand();
     421          325 :             started_tx = true;
     422              :         }
     423              : 
     424              :         Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
     425              : 
     426         1765 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
     427              :         {
     428              :             /*
     429              :              * Apply has caught up to the position where the table sync has
     430              :              * finished.  Mark the table as ready so that the apply will just
     431              :              * continue to replicate it normally.
     432              :              */
     433          191 :             if (current_lsn >= rstate->lsn)
     434              :             {
     435              :                 char        originname[NAMEDATALEN];
     436              : 
     437          191 :                 rstate->state = SUBREL_STATE_READY;
     438          191 :                 rstate->lsn = current_lsn;
     439              : 
     440              :                 /*
     441              :                  * Remove the tablesync origin tracking if exists.
     442              :                  *
     443              :                  * There is a chance that the user is concurrently performing
     444              :                  * refresh for the subscription where we remove the table
     445              :                  * state and its origin or the tablesync worker would have
     446              :                  * already removed this origin. We can't rely on tablesync
     447              :                  * worker to remove the origin tracking as if there is any
     448              :                  * error while dropping we won't restart it to drop the
     449              :                  * origin. So passing missing_ok = true.
     450              :                  *
     451              :                  * Lock the subscription and origin in the same order as we
     452              :                  * are doing during DDL commands to avoid deadlocks. See
     453              :                  * AlterSubscription_refresh.
     454              :                  */
     455          191 :                 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
     456              :                                  0, AccessShareLock);
     457              : 
     458          191 :                 if (!rel)
     459          185 :                     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     460              : 
     461          191 :                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
     462              :                                                    rstate->relid,
     463              :                                                    originname,
     464              :                                                    sizeof(originname));
     465          191 :                 replorigin_drop_by_name(originname, true, false);
     466              : 
     467              :                 /*
     468              :                  * Update the state to READY only after the origin cleanup.
     469              :                  */
     470          191 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     471          191 :                                            rstate->relid, rstate->state,
     472              :                                            rstate->lsn, true);
     473              :             }
     474              :         }
     475              :         else
     476              :         {
     477              :             LogicalRepWorker *syncworker;
     478              : 
     479              :             /*
     480              :              * Look for a sync worker for this relation.
     481              :              */
     482         1574 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     483              : 
     484         1574 :             syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
     485         1574 :                                                 MyLogicalRepWorker->subid,
     486              :                                                 rstate->relid, false);
     487              : 
     488         1574 :             if (syncworker)
     489              :             {
     490              :                 /* Found one, update our copy of its state */
     491          716 :                 SpinLockAcquire(&syncworker->relmutex);
     492          716 :                 rstate->state = syncworker->relstate;
     493          716 :                 rstate->lsn = syncworker->relstate_lsn;
     494          716 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     495              :                 {
     496              :                     /*
     497              :                      * Sync worker is waiting for apply.  Tell sync worker it
     498              :                      * can catchup now.
     499              :                      */
     500          192 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
     501          192 :                     syncworker->relstate_lsn =
     502          192 :                         Max(syncworker->relstate_lsn, current_lsn);
     503              :                 }
     504          716 :                 SpinLockRelease(&syncworker->relmutex);
     505              : 
     506              :                 /* If we told worker to catch up, wait for it. */
     507          716 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     508              :                 {
     509              :                     /* Signal the sync worker, as it may be waiting for us. */
     510          192 :                     if (syncworker->proc)
     511          192 :                         logicalrep_worker_wakeup_ptr(syncworker);
     512              : 
     513              :                     /* Now safe to release the LWLock */
     514          192 :                     LWLockRelease(LogicalRepWorkerLock);
     515              : 
     516          192 :                     if (started_tx)
     517              :                     {
     518              :                         /*
     519              :                          * We must commit the existing transaction to release
     520              :                          * the existing locks before entering a busy loop.
     521              :                          * This is required to avoid any undetected deadlocks
     522              :                          * due to any existing lock as deadlock detector won't
     523              :                          * be able to detect the waits on the latch.
     524              :                          *
     525              :                          * Also close any tables prior to the commit.
     526              :                          */
     527          192 :                         if (rel)
     528              :                         {
     529           29 :                             table_close(rel, NoLock);
     530           29 :                             rel = NULL;
     531              :                         }
     532          192 :                         CommitTransactionCommand();
     533          192 :                         pgstat_report_stat(false);
     534              :                     }
     535              : 
     536              :                     /*
     537              :                      * Enter busy loop and wait for synchronization worker to
     538              :                      * reach expected state (or die trying).
     539              :                      */
     540          192 :                     StartTransactionCommand();
     541          192 :                     started_tx = true;
     542              : 
     543          192 :                     wait_for_table_state_change(rstate->relid,
     544              :                                                 SUBREL_STATE_SYNCDONE);
     545              :                 }
     546              :                 else
     547          524 :                     LWLockRelease(LogicalRepWorkerLock);
     548              :             }
     549              :             else
     550              :             {
     551              :                 /*
     552              :                  * If there is no sync worker for this table yet, count
     553              :                  * running sync workers for this subscription, while we have
     554              :                  * the lock.
     555              :                  */
     556              :                 int         nsyncworkers =
     557          858 :                     logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     558              :                 struct tablesync_start_time_mapping *hentry;
     559              :                 bool        found;
     560              : 
     561              :                 /* Now safe to release the LWLock */
     562          858 :                 LWLockRelease(LogicalRepWorkerLock);
     563              : 
     564          858 :                 hentry = hash_search(last_start_times, &rstate->relid,
     565              :                                      HASH_ENTER, &found);
     566          858 :                 if (!found)
     567          199 :                     hentry->last_start_time = 0;
     568              : 
     569          858 :                 launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
     570              :                                    rstate->relid, &hentry->last_start_time);
     571              :             }
     572              :         }
     573              :     }
     574              : 
     575              :     /* Close table if opened */
     576         4336 :     if (rel)
     577          156 :         table_close(rel, NoLock);
     578              : 
     579              : 
     580         4336 :     if (started_tx)
     581              :     {
     582              :         /*
     583              :          * Even when the two_phase mode is requested by the user, it remains
     584              :          * as 'pending' until all tablesyncs have reached READY state.
     585              :          *
     586              :          * When this happens, we restart the apply worker and (if the
     587              :          * conditions are still ok) then the two_phase tri-state will become
     588              :          * 'enabled' at that time.
     589              :          *
     590              :          * Note: If the subscription has no tables then leave the state as
     591              :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
     592              :          * work.
     593              :          */
     594         1106 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
     595              :         {
     596           40 :             CommandCounterIncrement();  /* make updates visible */
     597           40 :             if (AllTablesyncsReady())
     598              :             {
     599            6 :                 ereport(LOG,
     600              :                         (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
     601              :                                 MySubscription->name)));
     602            6 :                 should_exit = true;
     603              :             }
     604              :         }
     605              : 
     606         1106 :         CommitTransactionCommand();
     607         1106 :         pgstat_report_stat(true);
     608              :     }
     609              : 
     610         4336 :     if (should_exit)
     611              :     {
     612              :         /*
     613              :          * Reset the last-start time for this worker so that the launcher will
     614              :          * restart it without waiting for wal_retrieve_retry_interval.
     615              :          */
     616            6 :         ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
     617              : 
     618            6 :         proc_exit(0);
     619              :     }
     620         4330 : }
     621              : 
     622              : /*
     623              :  * Create list of columns for COPY based on logical relation mapping.
     624              :  */
     625              : static List *
     626          206 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
     627              : {
     628          206 :     List       *attnamelist = NIL;
     629              :     int         i;
     630              : 
     631          546 :     for (i = 0; i < rel->remoterel.natts; i++)
     632              :     {
     633          340 :         attnamelist = lappend(attnamelist,
     634          340 :                               makeString(rel->remoterel.attnames[i]));
     635              :     }
     636              : 
     637              : 
     638          206 :     return attnamelist;
     639              : }
     640              : 
     641              : /*
     642              :  * Data source callback for the COPY FROM, which reads from the remote
     643              :  * connection and passes the data back to our local COPY.
     644              :  */
     645              : static int
     646        15061 : copy_read_data(void *outbuf, int minread, int maxread)
     647              : {
     648        15061 :     int         bytesread = 0;
     649              :     int         avail;
     650              : 
     651              :     /* If there are some leftover data from previous read, use it. */
     652        15061 :     avail = copybuf->len - copybuf->cursor;
     653        15061 :     if (avail)
     654              :     {
     655            0 :         if (avail > maxread)
     656            0 :             avail = maxread;
     657            0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     658            0 :         copybuf->cursor += avail;
     659            0 :         maxread -= avail;
     660            0 :         bytesread += avail;
     661              :     }
     662              : 
     663        15063 :     while (maxread > 0 && bytesread < minread)
     664              :     {
     665        15063 :         pgsocket    fd = PGINVALID_SOCKET;
     666              :         int         len;
     667        15063 :         char       *buf = NULL;
     668              : 
     669              :         for (;;)
     670              :         {
     671              :             /* Try read the data. */
     672        15063 :             len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
     673              : 
     674        15063 :             CHECK_FOR_INTERRUPTS();
     675              : 
     676        15063 :             if (len == 0)
     677            2 :                 break;
     678        15061 :             else if (len < 0)
     679        15061 :                 return bytesread;
     680              :             else
     681              :             {
     682              :                 /* Process the data */
     683        14857 :                 copybuf->data = buf;
     684        14857 :                 copybuf->len = len;
     685        14857 :                 copybuf->cursor = 0;
     686              : 
     687        14857 :                 avail = copybuf->len - copybuf->cursor;
     688        14857 :                 if (avail > maxread)
     689            0 :                     avail = maxread;
     690        14857 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     691        14857 :                 outbuf = (char *) outbuf + avail;
     692        14857 :                 copybuf->cursor += avail;
     693        14857 :                 maxread -= avail;
     694        14857 :                 bytesread += avail;
     695              :             }
     696              : 
     697        14857 :             if (maxread <= 0 || bytesread >= minread)
     698        14857 :                 return bytesread;
     699              :         }
     700              : 
     701              :         /*
     702              :          * Wait for more data or latch.
     703              :          */
     704            2 :         (void) WaitLatchOrSocket(MyLatch,
     705              :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
     706              :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     707              :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
     708              : 
     709            2 :         ResetLatch(MyLatch);
     710              :     }
     711              : 
     712            0 :     return bytesread;
     713              : }
     714              : 
     715              : 
     716              : /*
     717              :  * Get information about remote relation in similar fashion the RELATION
     718              :  * message provides during replication.
     719              :  *
     720              :  * This function also returns (a) the relation qualifications to be used in
     721              :  * the COPY command, and (b) whether the remote relation has published any
     722              :  * generated column.
     723              :  */
     724              : static void
     725          209 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
     726              :                         List **qual, bool *gencol_published)
     727              : {
     728              :     WalRcvExecResult *res;
     729              :     StringInfoData cmd;
     730              :     TupleTableSlot *slot;
     731          209 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
     732          209 :     Oid         attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
     733          209 :     Oid         qualRow[] = {TEXTOID};
     734              :     bool        isnull;
     735              :     int         natt;
     736          209 :     StringInfo  pub_names = NULL;
     737          209 :     Bitmapset  *included_cols = NULL;
     738          209 :     int         server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
     739              : 
     740          209 :     lrel->nspname = nspname;
     741          209 :     lrel->relname = relname;
     742              : 
     743              :     /* First fetch Oid and replica identity. */
     744          209 :     initStringInfo(&cmd);
     745          209 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
     746              :                      "  FROM pg_catalog.pg_class c"
     747              :                      "  INNER JOIN pg_catalog.pg_namespace n"
     748              :                      "        ON (c.relnamespace = n.oid)"
     749              :                      " WHERE n.nspname = %s"
     750              :                      "   AND c.relname = %s",
     751              :                      quote_literal_cstr(nspname),
     752              :                      quote_literal_cstr(relname));
     753          209 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     754              :                       lengthof(tableRow), tableRow);
     755              : 
     756          209 :     if (res->status != WALRCV_OK_TUPLES)
     757            0 :         ereport(ERROR,
     758              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     759              :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     760              :                         nspname, relname, res->err)));
     761              : 
     762          209 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     763          209 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     764            1 :         ereport(ERROR,
     765              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     766              :                  errmsg("table \"%s.%s\" not found on publisher",
     767              :                         nspname, relname)));
     768              : 
     769          208 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
     770              :     Assert(!isnull);
     771          208 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
     772              :     Assert(!isnull);
     773          208 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
     774              :     Assert(!isnull);
     775              : 
     776          208 :     ExecDropSingleTupleTableSlot(slot);
     777          208 :     walrcv_clear_result(res);
     778              : 
     779              : 
     780              :     /*
     781              :      * Get column lists for each relation.
     782              :      *
     783              :      * We need to do this before fetching info about column names and types,
     784              :      * so that we can skip columns that should not be replicated.
     785              :      */
     786          208 :     if (server_version >= 150000)
     787              :     {
     788              :         WalRcvExecResult *pubres;
     789              :         TupleTableSlot *tslot;
     790          208 :         Oid         attrsRow[] = {INT2VECTOROID};
     791              : 
     792              :         /* Build the pub_names comma-separated string. */
     793          208 :         pub_names = makeStringInfo();
     794          208 :         GetPublicationsStr(MySubscription->publications, pub_names, true);
     795              : 
     796              :         /*
     797              :          * Fetch info about column lists for the relation (from all the
     798              :          * publications).
     799              :          */
     800          208 :         resetStringInfo(&cmd);
     801          208 :         appendStringInfo(&cmd,
     802              :                          "SELECT DISTINCT"
     803              :                          "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
     804              :                          "   THEN NULL ELSE gpt.attrs END)"
     805              :                          "  FROM pg_publication p,"
     806              :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
     807              :                          "  pg_class c"
     808              :                          " WHERE gpt.relid = %u AND c.oid = gpt.relid"
     809              :                          "   AND p.pubname IN ( %s )",
     810              :                          lrel->remoteid,
     811              :                          pub_names->data);
     812              : 
     813          208 :         pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     814              :                              lengthof(attrsRow), attrsRow);
     815              : 
     816          208 :         if (pubres->status != WALRCV_OK_TUPLES)
     817            0 :             ereport(ERROR,
     818              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     819              :                      errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
     820              :                             nspname, relname, pubres->err)));
     821              : 
     822              :         /*
     823              :          * We don't support the case where the column list is different for
     824              :          * the same table when combining publications. See comments atop
     825              :          * fetch_relation_list. So there should be only one row returned.
     826              :          * Although we already checked this when creating the subscription, we
     827              :          * still need to check here in case the column list was changed after
     828              :          * creating the subscription and before the sync worker is started.
     829              :          */
     830          208 :         if (tuplestore_tuple_count(pubres->tuplestore) > 1)
     831            0 :             ereport(ERROR,
     832              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     833              :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
     834              :                            nspname, relname));
     835              : 
     836              :         /*
     837              :          * Get the column list and build a single bitmap with the attnums.
     838              :          *
     839              :          * If we find a NULL value, it means all the columns should be
     840              :          * replicated.
     841              :          */
     842          208 :         tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
     843          208 :         if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
     844              :         {
     845          208 :             Datum       cfval = slot_getattr(tslot, 1, &isnull);
     846              : 
     847          208 :             if (!isnull)
     848              :             {
     849              :                 ArrayType  *arr;
     850              :                 int         nelems;
     851              :                 int16      *elems;
     852              : 
     853           22 :                 arr = DatumGetArrayTypeP(cfval);
     854           22 :                 nelems = ARR_DIMS(arr)[0];
     855           22 :                 elems = (int16 *) ARR_DATA_PTR(arr);
     856              : 
     857           59 :                 for (natt = 0; natt < nelems; natt++)
     858           37 :                     included_cols = bms_add_member(included_cols, elems[natt]);
     859              :             }
     860              : 
     861          208 :             ExecClearTuple(tslot);
     862              :         }
     863          208 :         ExecDropSingleTupleTableSlot(tslot);
     864              : 
     865          208 :         walrcv_clear_result(pubres);
     866              :     }
     867              : 
     868              :     /*
     869              :      * Now fetch column names and types.
     870              :      */
     871          208 :     resetStringInfo(&cmd);
     872          208 :     appendStringInfoString(&cmd,
     873              :                            "SELECT a.attnum,"
     874              :                            "       a.attname,"
     875              :                            "       a.atttypid,"
     876              :                            "       a.attnum = ANY(i.indkey)");
     877              : 
     878              :     /* Generated columns can be replicated since version 18. */
     879          208 :     if (server_version >= 180000)
     880          208 :         appendStringInfoString(&cmd, ", a.attgenerated != ''");
     881              : 
     882          416 :     appendStringInfo(&cmd,
     883              :                      "  FROM pg_catalog.pg_attribute a"
     884              :                      "  LEFT JOIN pg_catalog.pg_index i"
     885              :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
     886              :                      " WHERE a.attnum > 0::pg_catalog.int2"
     887              :                      "   AND NOT a.attisdropped %s"
     888              :                      "   AND a.attrelid = %u"
     889              :                      " ORDER BY a.attnum",
     890              :                      lrel->remoteid,
     891          208 :                      (server_version >= 120000 && server_version < 180000 ?
     892              :                       "AND a.attgenerated = ''" : ""),
     893              :                      lrel->remoteid);
     894          208 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     895              :                       server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
     896              : 
     897          208 :     if (res->status != WALRCV_OK_TUPLES)
     898            0 :         ereport(ERROR,
     899              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     900              :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     901              :                         nspname, relname, res->err)));
     902              : 
     903              :     /* We don't know the number of rows coming, so allocate enough space. */
     904          208 :     lrel->attnames = palloc0_array(char *, MaxTupleAttributeNumber);
     905          208 :     lrel->atttyps = palloc0_array(Oid, MaxTupleAttributeNumber);
     906          208 :     lrel->attkeys = NULL;
     907              : 
     908              :     /*
     909              :      * Store the columns as a list of names.  Ignore those that are not
     910              :      * present in the column list, if there is one.
     911              :      */
     912          208 :     natt = 0;
     913          208 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     914          585 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     915              :     {
     916              :         char       *rel_colname;
     917              :         AttrNumber  attnum;
     918              : 
     919          377 :         attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
     920              :         Assert(!isnull);
     921              : 
     922              :         /* If the column is not in the column list, skip it. */
     923          377 :         if (included_cols != NULL && !bms_is_member(attnum, included_cols))
     924              :         {
     925           31 :             ExecClearTuple(slot);
     926           31 :             continue;
     927              :         }
     928              : 
     929          346 :         rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
     930              :         Assert(!isnull);
     931              : 
     932          346 :         lrel->attnames[natt] = rel_colname;
     933          346 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
     934              :         Assert(!isnull);
     935              : 
     936          346 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
     937          111 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
     938              : 
     939              :         /* Remember if the remote table has published any generated column. */
     940          346 :         if (server_version >= 180000 && !(*gencol_published))
     941              :         {
     942          346 :             *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
     943              :             Assert(!isnull);
     944              :         }
     945              : 
     946              :         /* Should never happen. */
     947          346 :         if (++natt >= MaxTupleAttributeNumber)
     948            0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
     949              :                  nspname, relname);
     950              : 
     951          346 :         ExecClearTuple(slot);
     952              :     }
     953          208 :     ExecDropSingleTupleTableSlot(slot);
     954              : 
     955          208 :     lrel->natts = natt;
     956              : 
     957          208 :     walrcv_clear_result(res);
     958              : 
     959              :     /*
     960              :      * Get relation's row filter expressions. DISTINCT avoids the same
     961              :      * expression of a table in multiple publications from being included
     962              :      * multiple times in the final expression.
     963              :      *
     964              :      * We need to copy the row even if it matches just one of the
     965              :      * publications, so we later combine all the quals with OR.
     966              :      *
     967              :      * For initial synchronization, row filtering can be ignored in following
     968              :      * cases:
     969              :      *
     970              :      * 1) one of the subscribed publications for the table hasn't specified
     971              :      * any row filter
     972              :      *
     973              :      * 2) one of the subscribed publications has puballtables set to true
     974              :      *
     975              :      * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
     976              :      * that includes this relation
     977              :      */
     978          208 :     if (server_version >= 150000)
     979              :     {
     980              :         /* Reuse the already-built pub_names. */
     981              :         Assert(pub_names != NULL);
     982              : 
     983              :         /* Check for row filters. */
     984          208 :         resetStringInfo(&cmd);
     985          208 :         appendStringInfo(&cmd,
     986              :                          "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
     987              :                          "  FROM pg_publication p,"
     988              :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt"
     989              :                          " WHERE gpt.relid = %u"
     990              :                          "   AND p.pubname IN ( %s )",
     991              :                          lrel->remoteid,
     992              :                          pub_names->data);
     993              : 
     994          208 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
     995              : 
     996          208 :         if (res->status != WALRCV_OK_TUPLES)
     997            0 :             ereport(ERROR,
     998              :                     (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
     999              :                             nspname, relname, res->err)));
    1000              : 
    1001              :         /*
    1002              :          * Multiple row filter expressions for the same table will be combined
    1003              :          * by COPY using OR. If any of the filter expressions for this table
    1004              :          * are null, it means the whole table will be copied. In this case it
    1005              :          * is not necessary to construct a unified row filter expression at
    1006              :          * all.
    1007              :          */
    1008          208 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    1009          223 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    1010              :         {
    1011          212 :             Datum       rf = slot_getattr(slot, 1, &isnull);
    1012              : 
    1013          212 :             if (!isnull)
    1014           15 :                 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
    1015              :             else
    1016              :             {
    1017              :                 /* Ignore filters and cleanup as necessary. */
    1018          197 :                 if (*qual)
    1019              :                 {
    1020            3 :                     list_free_deep(*qual);
    1021            3 :                     *qual = NIL;
    1022              :                 }
    1023          197 :                 break;
    1024              :             }
    1025              : 
    1026           15 :             ExecClearTuple(slot);
    1027              :         }
    1028          208 :         ExecDropSingleTupleTableSlot(slot);
    1029              : 
    1030          208 :         walrcv_clear_result(res);
    1031          208 :         destroyStringInfo(pub_names);
    1032              :     }
    1033              : 
    1034          208 :     pfree(cmd.data);
    1035          208 : }
    1036              : 
    1037              : /*
    1038              :  * Copy existing data of a table from publisher.
    1039              :  *
    1040              :  * Caller is responsible for locking the local relation.
    1041              :  */
    1042              : static void
    1043          209 : copy_table(Relation rel)
    1044              : {
    1045              :     LogicalRepRelMapEntry *relmapentry;
    1046              :     LogicalRepRelation lrel;
    1047          209 :     List       *qual = NIL;
    1048              :     WalRcvExecResult *res;
    1049              :     StringInfoData cmd;
    1050              :     CopyFromState cstate;
    1051              :     List       *attnamelist;
    1052              :     ParseState *pstate;
    1053          209 :     List       *options = NIL;
    1054          209 :     bool        gencol_published = false;
    1055              : 
    1056              :     /* Get the publisher relation info. */
    1057          209 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
    1058          209 :                             RelationGetRelationName(rel), &lrel, &qual,
    1059              :                             &gencol_published);
    1060              : 
    1061              :     /* Put the relation into relmap. */
    1062          208 :     logicalrep_relmap_update(&lrel);
    1063              : 
    1064              :     /* Map the publisher relation to local one. */
    1065          208 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
    1066              :     Assert(rel == relmapentry->localrel);
    1067              : 
    1068              :     /* Start copy on the publisher. */
    1069          206 :     initStringInfo(&cmd);
    1070              : 
    1071              :     /* Regular or partitioned table with no row filter or generated columns */
    1072          206 :     if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
    1073          206 :         && qual == NIL && !gencol_published)
    1074              :     {
    1075          192 :         appendStringInfo(&cmd, "COPY %s",
    1076          192 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
    1077              : 
    1078              :         /* If the table has columns, then specify the columns */
    1079          192 :         if (lrel.natts)
    1080              :         {
    1081          191 :             appendStringInfoString(&cmd, " (");
    1082              : 
    1083              :             /*
    1084              :              * XXX Do we need to list the columns in all cases? Maybe we're
    1085              :              * replicating all columns?
    1086              :              */
    1087          510 :             for (int i = 0; i < lrel.natts; i++)
    1088              :             {
    1089          319 :                 if (i > 0)
    1090          128 :                     appendStringInfoString(&cmd, ", ");
    1091              : 
    1092          319 :                 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
    1093              :             }
    1094              : 
    1095          191 :             appendStringInfoChar(&cmd, ')');
    1096              :         }
    1097              : 
    1098          192 :         appendStringInfoString(&cmd, " TO STDOUT");
    1099              :     }
    1100              :     else
    1101              :     {
    1102              :         /*
    1103              :          * For non-tables and tables with row filters, we need to do COPY
    1104              :          * (SELECT ...), but we can't just do SELECT * because we may need to
    1105              :          * copy only subset of columns including generated columns. For tables
    1106              :          * with any row filters, build a SELECT query with OR'ed row filters
    1107              :          * for COPY.
    1108              :          *
    1109              :          * We also need to use this same COPY (SELECT ...) syntax when
    1110              :          * generated columns are published, because copy of generated columns
    1111              :          * is not supported by the normal COPY.
    1112              :          */
    1113           14 :         appendStringInfoString(&cmd, "COPY (SELECT ");
    1114           35 :         for (int i = 0; i < lrel.natts; i++)
    1115              :         {
    1116           21 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
    1117           21 :             if (i < lrel.natts - 1)
    1118            7 :                 appendStringInfoString(&cmd, ", ");
    1119              :         }
    1120              : 
    1121           14 :         appendStringInfoString(&cmd, " FROM ");
    1122              : 
    1123              :         /*
    1124              :          * For regular tables, make sure we don't copy data from a child that
    1125              :          * inherits the named table as those will be copied separately.
    1126              :          */
    1127           14 :         if (lrel.relkind == RELKIND_RELATION)
    1128           11 :             appendStringInfoString(&cmd, "ONLY ");
    1129              : 
    1130           14 :         appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
    1131              :         /* list of OR'ed filters */
    1132           14 :         if (qual != NIL)
    1133              :         {
    1134              :             ListCell   *lc;
    1135           11 :             char       *q = strVal(linitial(qual));
    1136              : 
    1137           11 :             appendStringInfo(&cmd, " WHERE %s", q);
    1138           12 :             for_each_from(lc, qual, 1)
    1139              :             {
    1140            1 :                 q = strVal(lfirst(lc));
    1141            1 :                 appendStringInfo(&cmd, " OR %s", q);
    1142              :             }
    1143           11 :             list_free_deep(qual);
    1144              :         }
    1145              : 
    1146           14 :         appendStringInfoString(&cmd, ") TO STDOUT");
    1147              :     }
    1148              : 
    1149              :     /*
    1150              :      * Prior to v16, initial table synchronization will use text format even
    1151              :      * if the binary option is enabled for a subscription.
    1152              :      */
    1153          206 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
    1154          206 :         MySubscription->binary)
    1155              :     {
    1156            5 :         appendStringInfoString(&cmd, " WITH (FORMAT binary)");
    1157            5 :         options = list_make1(makeDefElem("format",
    1158              :                                          (Node *) makeString("binary"), -1));
    1159              :     }
    1160              : 
    1161          206 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
    1162          206 :     pfree(cmd.data);
    1163          206 :     if (res->status != WALRCV_OK_COPY_OUT)
    1164            0 :         ereport(ERROR,
    1165              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1166              :                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
    1167              :                         lrel.nspname, lrel.relname, res->err)));
    1168          206 :     walrcv_clear_result(res);
    1169              : 
    1170          206 :     copybuf = makeStringInfo();
    1171              : 
    1172          206 :     pstate = make_parsestate(NULL);
    1173          206 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
    1174              :                                          NULL, false, false);
    1175              : 
    1176          206 :     attnamelist = make_copy_attnamelist(relmapentry);
    1177          206 :     cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
    1178              : 
    1179              :     /* Do the copy */
    1180          205 :     (void) CopyFrom(cstate);
    1181              : 
    1182          194 :     logicalrep_rel_close(relmapentry, NoLock);
    1183          194 : }
    1184              : 
    1185              : /*
    1186              :  * Determine the tablesync slot name.
    1187              :  *
    1188              :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
    1189              :  * on slot name length. We append system_identifier to avoid slot_name
    1190              :  * collision with subscriptions in other clusters. With the current scheme
    1191              :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
    1192              :  * length of slot_name will be 50.
    1193              :  *
    1194              :  * The returned slot name is stored in the supplied buffer (syncslotname) with
    1195              :  * the given size.
    1196              :  *
    1197              :  * Note: We don't use the subscription slot name as part of tablesync slot name
    1198              :  * because we are responsible for cleaning up these slots and it could become
    1199              :  * impossible to recalculate what name to cleanup if the subscription slot name
    1200              :  * had changed.
    1201              :  */
    1202              : void
    1203          408 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
    1204              :                                 char *syncslotname, Size szslot)
    1205              : {
    1206          408 :     snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
    1207              :              relid, GetSystemIdentifier());
    1208          408 : }
    1209              : 
    1210              : /*
    1211              :  * Start syncing the table in the sync worker.
    1212              :  *
    1213              :  * If nothing needs to be done to sync the table, we exit the worker without
    1214              :  * any further action.
    1215              :  *
    1216              :  * The returned slot name is palloc'ed in current memory context.
    1217              :  */
    1218              : static char *
    1219          210 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    1220              : {
    1221              :     char       *slotname;
    1222              :     char       *err;
    1223              :     char        relstate;
    1224              :     XLogRecPtr  relstate_lsn;
    1225              :     Relation    rel;
    1226              :     AclResult   aclresult;
    1227              :     WalRcvExecResult *res;
    1228              :     char        originname[NAMEDATALEN];
    1229              :     ReplOriginId originid;
    1230              :     UserContext ucxt;
    1231              :     bool        must_use_password;
    1232              :     bool        run_as_owner;
    1233              : 
    1234              :     /* Check the state of the table synchronization. */
    1235          210 :     StartTransactionCommand();
    1236          210 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
    1237          210 :                                        MyLogicalRepWorker->relid,
    1238              :                                        &relstate_lsn);
    1239          210 :     CommitTransactionCommand();
    1240              : 
    1241              :     /* Is the use of a password mandatory? */
    1242          415 :     must_use_password = MySubscription->passwordrequired &&
    1243          205 :         !MySubscription->ownersuperuser;
    1244              : 
    1245          210 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1246          210 :     MyLogicalRepWorker->relstate = relstate;
    1247          210 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
    1248          210 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1249              : 
    1250              :     /*
    1251              :      * If synchronization is already done or no longer necessary, exit now
    1252              :      * that we've updated shared memory state.
    1253              :      */
    1254          210 :     switch (relstate)
    1255              :     {
    1256            0 :         case SUBREL_STATE_SYNCDONE:
    1257              :         case SUBREL_STATE_READY:
    1258              :         case SUBREL_STATE_UNKNOWN:
    1259            0 :             FinishSyncWorker(); /* doesn't return */
    1260              :     }
    1261              : 
    1262              :     /* Calculate the name of the tablesync slot. */
    1263          210 :     slotname = (char *) palloc(NAMEDATALEN);
    1264          210 :     ReplicationSlotNameForTablesync(MySubscription->oid,
    1265          210 :                                     MyLogicalRepWorker->relid,
    1266              :                                     slotname,
    1267              :                                     NAMEDATALEN);
    1268              : 
    1269              :     /*
    1270              :      * Here we use the slot name instead of the subscription name as the
    1271              :      * application_name, so that it is different from the leader apply worker,
    1272              :      * so that synchronous replication can distinguish them.
    1273              :      */
    1274          210 :     LogRepWorkerWalRcvConn =
    1275          210 :         walrcv_connect(MySubscription->conninfo, true, true,
    1276              :                        must_use_password,
    1277              :                        slotname, &err);
    1278          210 :     if (LogRepWorkerWalRcvConn == NULL)
    1279            0 :         ereport(ERROR,
    1280              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1281              :                  errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
    1282              :                         MySubscription->name, err)));
    1283              : 
    1284              :     Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
    1285              :            MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
    1286              :            MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
    1287              : 
    1288              :     /* Assign the origin tracking record name. */
    1289          210 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
    1290          210 :                                        MyLogicalRepWorker->relid,
    1291              :                                        originname,
    1292              :                                        sizeof(originname));
    1293              : 
    1294          210 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
    1295              :     {
    1296              :         /*
    1297              :          * We have previously errored out before finishing the copy so the
    1298              :          * replication slot might exist. We want to remove the slot if it
    1299              :          * already exists and proceed.
    1300              :          *
    1301              :          * XXX We could also instead try to drop the slot, last time we failed
    1302              :          * but for that, we might need to clean up the copy state as it might
    1303              :          * be in the middle of fetching the rows. Also, if there is a network
    1304              :          * breakdown then it wouldn't have succeeded so trying it next time
    1305              :          * seems like a better bet.
    1306              :          */
    1307           11 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
    1308              :     }
    1309          199 :     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
    1310              :     {
    1311              :         /*
    1312              :          * The COPY phase was previously done, but tablesync then crashed
    1313              :          * before it was able to finish normally.
    1314              :          */
    1315            0 :         StartTransactionCommand();
    1316              : 
    1317              :         /*
    1318              :          * The origin tracking name must already exist. It was created first
    1319              :          * time this tablesync was launched.
    1320              :          */
    1321            0 :         originid = replorigin_by_name(originname, false);
    1322            0 :         replorigin_session_setup(originid, 0);
    1323            0 :         replorigin_xact_state.origin = originid;
    1324            0 :         *origin_startpos = replorigin_session_get_progress(false);
    1325              : 
    1326            0 :         CommitTransactionCommand();
    1327              : 
    1328            0 :         goto copy_table_done;
    1329              :     }
    1330              : 
    1331          210 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1332          210 :     MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
    1333          210 :     MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
    1334          210 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1335              : 
    1336              :     /*
    1337              :      * Update the state, create the replication origin, and make them visible
    1338              :      * to others.
    1339              :      */
    1340          210 :     StartTransactionCommand();
    1341          210 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1342          210 :                                MyLogicalRepWorker->relid,
    1343          210 :                                MyLogicalRepWorker->relstate,
    1344          210 :                                MyLogicalRepWorker->relstate_lsn,
    1345              :                                false);
    1346              : 
    1347              :     /*
    1348              :      * Create the replication origin in a separate transaction from the one
    1349              :      * that sets up the origin in shared memory. This prevents the risk that
    1350              :      * changes to the origin in shared memory cannot be rolled back if the
    1351              :      * transaction aborts.
    1352              :      */
    1353          209 :     originid = replorigin_by_name(originname, true);
    1354          209 :     if (!OidIsValid(originid))
    1355          198 :         originid = replorigin_create(originname);
    1356              : 
    1357          209 :     CommitTransactionCommand();
    1358          209 :     pgstat_report_stat(true);
    1359              : 
    1360          209 :     StartTransactionCommand();
    1361              : 
    1362              :     /*
    1363              :      * Use a standard write lock here. It might be better to disallow access
    1364              :      * to the table while it's being synchronized. But we don't want to block
    1365              :      * the main apply process from working and it has to open the relation in
    1366              :      * RowExclusiveLock when remapping remote relation id to local one.
    1367              :      */
    1368          209 :     rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
    1369              : 
    1370              :     /*
    1371              :      * Start a transaction in the remote node in REPEATABLE READ mode.  This
    1372              :      * ensures that both the replication slot we create (see below) and the
    1373              :      * COPY are consistent with each other.
    1374              :      */
    1375          209 :     res = walrcv_exec(LogRepWorkerWalRcvConn,
    1376              :                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
    1377              :                       0, NULL);
    1378          209 :     if (res->status != WALRCV_OK_COMMAND)
    1379            0 :         ereport(ERROR,
    1380              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1381              :                  errmsg("table copy could not start transaction on publisher: %s",
    1382              :                         res->err)));
    1383          209 :     walrcv_clear_result(res);
    1384              : 
    1385              :     /*
    1386              :      * Create a new permanent logical decoding slot. This slot will be used
    1387              :      * for the catchup phase after COPY is done, so tell it to use the
    1388              :      * snapshot to make the final data consistent.
    1389              :      */
    1390          209 :     walrcv_create_slot(LogRepWorkerWalRcvConn,
    1391              :                        slotname, false /* permanent */ , false /* two_phase */ ,
    1392              :                        MySubscription->failover,
    1393              :                        CRS_USE_SNAPSHOT, origin_startpos);
    1394              : 
    1395              :     /*
    1396              :      * Advance the origin to the LSN got from walrcv_create_slot and then set
    1397              :      * up the origin. The advancement is WAL logged for the purpose of
    1398              :      * recovery. Locks are to prevent the replication origin from vanishing
    1399              :      * while advancing.
    1400              :      *
    1401              :      * The purpose of doing these before the copy is to avoid doing the copy
    1402              :      * again due to any error in advancing or setting up origin tracking.
    1403              :      */
    1404          209 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1405          209 :     replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
    1406              :                        true /* go backward */ , true /* WAL log */ );
    1407          209 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1408              : 
    1409          209 :     replorigin_session_setup(originid, 0);
    1410          209 :     replorigin_xact_state.origin = originid;
    1411              : 
    1412              :     /*
    1413              :      * If the user did not opt to run as the owner of the subscription
    1414              :      * ('run_as_owner'), then copy the table as the owner of the table.
    1415              :      */
    1416          209 :     run_as_owner = MySubscription->runasowner;
    1417          209 :     if (!run_as_owner)
    1418          208 :         SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
    1419              : 
    1420              :     /*
    1421              :      * Check that our table sync worker has permission to insert into the
    1422              :      * target table.
    1423              :      */
    1424          209 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    1425              :                                   ACL_INSERT);
    1426          209 :     if (aclresult != ACLCHECK_OK)
    1427            0 :         aclcheck_error(aclresult,
    1428            0 :                        get_relkind_objtype(rel->rd_rel->relkind),
    1429            0 :                        RelationGetRelationName(rel));
    1430              : 
    1431              :     /*
    1432              :      * COPY FROM does not honor RLS policies.  That is not a problem for
    1433              :      * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
    1434              :      * who has it implicitly), but other roles should not be able to
    1435              :      * circumvent RLS.  Disallow logical replication into RLS enabled
    1436              :      * relations for such roles.
    1437              :      */
    1438          209 :     if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
    1439            0 :         ereport(ERROR,
    1440              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1441              :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
    1442              :                         GetUserNameFromId(GetUserId(), true),
    1443              :                         RelationGetRelationName(rel))));
    1444              : 
    1445              :     /* Now do the initial data copy */
    1446          209 :     PushActiveSnapshot(GetTransactionSnapshot());
    1447          209 :     copy_table(rel);
    1448          194 :     PopActiveSnapshot();
    1449              : 
    1450          194 :     res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
    1451          194 :     if (res->status != WALRCV_OK_COMMAND)
    1452            0 :         ereport(ERROR,
    1453              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1454              :                  errmsg("table copy could not finish transaction on publisher: %s",
    1455              :                         res->err)));
    1456          194 :     walrcv_clear_result(res);
    1457              : 
    1458          194 :     if (!run_as_owner)
    1459          193 :         RestoreUserContext(&ucxt);
    1460              : 
    1461          194 :     table_close(rel, NoLock);
    1462              : 
    1463              :     /* Make the copy visible. */
    1464          194 :     CommandCounterIncrement();
    1465              : 
    1466              :     /*
    1467              :      * Update the persisted state to indicate the COPY phase is done; make it
    1468              :      * visible to others.
    1469              :      */
    1470          194 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1471          194 :                                MyLogicalRepWorker->relid,
    1472              :                                SUBREL_STATE_FINISHEDCOPY,
    1473          194 :                                MyLogicalRepWorker->relstate_lsn,
    1474              :                                false);
    1475              : 
    1476          194 :     CommitTransactionCommand();
    1477              : 
    1478          194 : copy_table_done:
    1479              : 
    1480          194 :     elog(DEBUG1,
    1481              :          "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
    1482              :          originname, LSN_FORMAT_ARGS(*origin_startpos));
    1483              : 
    1484              :     /*
    1485              :      * We are done with the initial data synchronization, update the state.
    1486              :      */
    1487          194 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1488          194 :     MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
    1489          194 :     MyLogicalRepWorker->relstate_lsn = *origin_startpos;
    1490          194 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1491              : 
    1492              :     /*
    1493              :      * Finally, wait until the leader apply worker tells us to catch up and
    1494              :      * then return to let LogicalRepApplyLoop do it.
    1495              :      */
    1496          194 :     wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
    1497          194 :     return slotname;
    1498              : }
    1499              : 
    1500              : /*
    1501              :  * Execute the initial sync with error handling. Disable the subscription,
    1502              :  * if it's required.
    1503              :  *
    1504              :  * Allocate the slot name in long-lived context on return. Note that we don't
    1505              :  * handle FATAL errors which are probably because of system resource error and
    1506              :  * are not repeatable.
    1507              :  */
    1508              : static void
    1509          210 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
    1510              : {
    1511          210 :     char       *sync_slotname = NULL;
    1512              : 
    1513              :     Assert(am_tablesync_worker());
    1514              : 
    1515          210 :     PG_TRY();
    1516              :     {
    1517              :         /* Call initial sync. */
    1518          210 :         sync_slotname = LogicalRepSyncTableStart(origin_startpos);
    1519              :     }
    1520           15 :     PG_CATCH();
    1521              :     {
    1522           15 :         if (MySubscription->disableonerr)
    1523            1 :             DisableSubscriptionAndExit();
    1524              :         else
    1525              :         {
    1526              :             /*
    1527              :              * Report the worker failed during table synchronization. Abort
    1528              :              * the current transaction so that the stats message is sent in an
    1529              :              * idle state.
    1530              :              */
    1531           14 :             AbortOutOfAnyTransaction();
    1532           14 :             pgstat_report_subscription_error(MySubscription->oid);
    1533              : 
    1534           14 :             PG_RE_THROW();
    1535              :         }
    1536              :     }
    1537          194 :     PG_END_TRY();
    1538              : 
    1539              :     /* allocate slot name in long-lived context */
    1540          194 :     *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
    1541          194 :     pfree(sync_slotname);
    1542          194 : }
    1543              : 
    1544              : /*
    1545              :  * Runs the tablesync worker.
    1546              :  *
    1547              :  * It starts syncing tables. After a successful sync, sets streaming options
    1548              :  * and starts streaming to catchup with apply worker.
    1549              :  */
    1550              : static void
    1551          210 : run_tablesync_worker(void)
    1552              : {
    1553              :     char        originname[NAMEDATALEN];
    1554          210 :     XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
    1555          210 :     char       *slotname = NULL;
    1556              :     WalRcvStreamOptions options;
    1557              : 
    1558          210 :     start_table_sync(&origin_startpos, &slotname);
    1559              : 
    1560          194 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
    1561          194 :                                        MyLogicalRepWorker->relid,
    1562              :                                        originname,
    1563              :                                        sizeof(originname));
    1564              : 
    1565          194 :     set_apply_error_context_origin(originname);
    1566              : 
    1567          194 :     set_stream_options(&options, slotname, &origin_startpos);
    1568              : 
    1569          194 :     walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    1570              : 
    1571              :     /* Apply the changes till we catchup with the apply worker. */
    1572          194 :     start_apply(origin_startpos);
    1573            0 : }
    1574              : 
    1575              : /* Logical Replication Tablesync worker entry point */
    1576              : void
    1577          210 : TableSyncWorkerMain(Datum main_arg)
    1578              : {
    1579          210 :     int         worker_slot = DatumGetInt32(main_arg);
    1580              : 
    1581          210 :     SetupApplyOrSyncWorker(worker_slot);
    1582              : 
    1583          210 :     run_tablesync_worker();
    1584              : 
    1585            0 :     FinishSyncWorker();
    1586              : }
    1587              : 
    1588              : /*
    1589              :  * If the subscription has no tables then return false.
    1590              :  *
    1591              :  * Otherwise, are all tablesyncs READY?
    1592              :  *
    1593              :  * Note: This function is not suitable to be called from outside of apply or
    1594              :  * tablesync workers because MySubscription needs to be already initialized.
    1595              :  */
    1596              : bool
    1597          221 : AllTablesyncsReady(void)
    1598              : {
    1599              :     bool        started_tx;
    1600              :     bool        has_tables;
    1601              : 
    1602              :     /* We need up-to-date sync state info for subscription tables here. */
    1603          221 :     FetchRelationStates(&has_tables, NULL, &started_tx);
    1604              : 
    1605          221 :     if (started_tx)
    1606              :     {
    1607           13 :         CommitTransactionCommand();
    1608           13 :         pgstat_report_stat(true);
    1609              :     }
    1610              : 
    1611              :     /*
    1612              :      * Return false when there are no tables in subscription or not all tables
    1613              :      * are in ready state; true otherwise.
    1614              :      */
    1615          221 :     return has_tables && (table_states_not_ready == NIL);
    1616              : }
    1617              : 
    1618              : /*
    1619              :  * Return whether the subscription currently has any tables.
    1620              :  *
    1621              :  * Note: Unlike HasSubscriptionTables(), this function relies on cached
    1622              :  * information for subscription tables. Additionally, it should not be
    1623              :  * invoked outside of apply or tablesync workers, as MySubscription must be
    1624              :  * initialized first.
    1625              :  */
    1626              : bool
    1627          140 : HasSubscriptionTablesCached(void)
    1628              : {
    1629              :     bool        started_tx;
    1630              :     bool        has_tables;
    1631              : 
    1632              :     /* We need up-to-date subscription tables info here */
    1633          140 :     FetchRelationStates(&has_tables, NULL, &started_tx);
    1634              : 
    1635          140 :     if (started_tx)
    1636              :     {
    1637            1 :         CommitTransactionCommand();
    1638            1 :         pgstat_report_stat(true);
    1639              :     }
    1640              : 
    1641          140 :     return has_tables;
    1642              : }
    1643              : 
    1644              : /*
    1645              :  * Update the two_phase state of the specified subscription in pg_subscription.
    1646              :  */
    1647              : void
    1648            8 : UpdateTwoPhaseState(Oid suboid, char new_state)
    1649              : {
    1650              :     Relation    rel;
    1651              :     HeapTuple   tup;
    1652              :     bool        nulls[Natts_pg_subscription];
    1653              :     bool        replaces[Natts_pg_subscription];
    1654              :     Datum       values[Natts_pg_subscription];
    1655              : 
    1656              :     Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
    1657              :            new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
    1658              :            new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
    1659              : 
    1660            8 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1661            8 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
    1662            8 :     if (!HeapTupleIsValid(tup))
    1663            0 :         elog(ERROR,
    1664              :              "cache lookup failed for subscription oid %u",
    1665              :              suboid);
    1666              : 
    1667              :     /* Form a new tuple. */
    1668            8 :     memset(values, 0, sizeof(values));
    1669            8 :     memset(nulls, false, sizeof(nulls));
    1670            8 :     memset(replaces, false, sizeof(replaces));
    1671              : 
    1672              :     /* And update/set two_phase state */
    1673            8 :     values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
    1674            8 :     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1675              : 
    1676            8 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel),
    1677              :                             values, nulls, replaces);
    1678            8 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1679              : 
    1680            8 :     heap_freetuple(tup);
    1681            8 :     table_close(rel, RowExclusiveLock);
    1682            8 : }
        

Generated by: LCOV version 2.0-1