LCOV - code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 443 485 91.3 %
Date: 2026-02-01 21:16:50 Functions: 16 16 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * tablesync.c
       3             :  *    PostgreSQL logical replication: initial table data synchronization
       4             :  *
       5             :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/tablesync.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code for initial table data synchronization for
      12             :  *    logical replication.
      13             :  *
      14             :  *    The initial data synchronization is done separately for each table,
      15             :  *    in a separate apply worker that only fetches the initial snapshot data
      16             :  *    from the publisher and then synchronizes the position in the stream with
      17             :  *    the leader apply worker.
      18             :  *
      19             :  *    There are several reasons for doing the synchronization this way:
      20             :  *     - It allows us to parallelize the initial data synchronization
      21             :  *       which lowers the time needed for it to happen.
      22             :  *     - The initial synchronization does not have to hold the xid and LSN
      23             :  *       for the time it takes to copy data of all tables, causing less
      24             :  *       bloat and lower disk consumption compared to doing the
      25             :  *       synchronization in a single process for the whole database.
      26             :  *     - It allows us to synchronize any tables added after the initial
      27             :  *       synchronization has finished.
      28             :  *
      29             :  *    The stream position synchronization works in multiple steps:
      30             :  *     - Apply worker requests a tablesync worker to start, setting the new
      31             :  *       table state to INIT.
      32             :  *     - Tablesync worker starts; changes table state from INIT to DATASYNC while
      33             :  *       copying.
      34             :  *     - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
      35             :  *       worker specific) state to indicate when the copy phase has completed, so
      36             :  *       if the worker crashes with this (non-memory) state then the copy will not
      37             :  *       be re-attempted.
      38             :  *     - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
      39             :  *     - Apply worker periodically checks for tables in SYNCWAIT state.  When
      40             :  *       any appear, it sets the table state to CATCHUP and starts loop-waiting
      41             :  *       until either the table state is set to SYNCDONE or the sync worker
      42             :  *       exits.
      43             :  *     - After the sync worker has seen the state change to CATCHUP, it will
      44             :  *       read the stream and apply changes (acting like an apply worker) until
      45             :  *       it catches up to the specified stream position.  Then it sets the
      46             :  *       state to SYNCDONE.  There might be zero changes applied between
      47             :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
      48             :  *       apply worker.
      49             :  *     - Once the state is set to SYNCDONE, the apply will continue tracking
      50             :  *       the table until it reaches the SYNCDONE stream position, at which
      51             :  *       point it sets state to READY and stops tracking.  Again, there might
      52             :  *       be zero changes in between.
      53             :  *
      54             :  *    So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
      55             :  *    -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
      56             :  *
      57             :  *    The catalog pg_subscription_rel is used to keep information about
      58             :  *    subscribed tables and their state.  The catalog holds all states
      59             :  *    except SYNCWAIT and CATCHUP which are only in shared memory.
      60             :  *
      61             :  *    Example flows look like this:
      62             :  *     - Apply is in front:
      63             :  *        sync:8
      64             :  *          -> set in catalog FINISHEDCOPY
      65             :  *          -> set in memory SYNCWAIT
      66             :  *        apply:10
      67             :  *          -> set in memory CATCHUP
      68             :  *          -> enter wait-loop
      69             :  *        sync:10
      70             :  *          -> set in catalog SYNCDONE
      71             :  *          -> exit
      72             :  *        apply:10
      73             :  *          -> exit wait-loop
      74             :  *          -> continue rep
      75             :  *        apply:11
      76             :  *          -> set in catalog READY
      77             :  *
      78             :  *     - Sync is in front:
      79             :  *        sync:10
      80             :  *          -> set in catalog FINISHEDCOPY
      81             :  *          -> set in memory SYNCWAIT
      82             :  *        apply:8
      83             :  *          -> set in memory CATCHUP
      84             :  *          -> continue per-table filtering
      85             :  *        sync:10
      86             :  *          -> set in catalog SYNCDONE
      87             :  *          -> exit
      88             :  *        apply:10
      89             :  *          -> set in catalog READY
      90             :  *          -> stop per-table filtering
      91             :  *          -> continue rep
      92             :  *-------------------------------------------------------------------------
      93             :  */
      94             : 
      95             : #include "postgres.h"
      96             : 
      97             : #include "access/table.h"
      98             : #include "access/xact.h"
      99             : #include "catalog/indexing.h"
     100             : #include "catalog/pg_subscription_rel.h"
     101             : #include "catalog/pg_type.h"
     102             : #include "commands/copy.h"
     103             : #include "miscadmin.h"
     104             : #include "nodes/makefuncs.h"
     105             : #include "parser/parse_relation.h"
     106             : #include "pgstat.h"
     107             : #include "replication/logicallauncher.h"
     108             : #include "replication/logicalrelation.h"
     109             : #include "replication/logicalworker.h"
     110             : #include "replication/origin.h"
     111             : #include "replication/slot.h"
     112             : #include "replication/walreceiver.h"
     113             : #include "replication/worker_internal.h"
     114             : #include "storage/ipc.h"
     115             : #include "storage/lmgr.h"
     116             : #include "utils/acl.h"
     117             : #include "utils/array.h"
     118             : #include "utils/builtins.h"
     119             : #include "utils/lsyscache.h"
     120             : #include "utils/rls.h"
     121             : #include "utils/snapmgr.h"
     122             : #include "utils/syscache.h"
     123             : #include "utils/usercontext.h"
     124             : 
     125             : List       *table_states_not_ready = NIL;
     126             : 
     127             : static StringInfo copybuf = NULL;
     128             : 
     129             : /*
     130             :  * Wait until the relation sync state is set in the catalog to the expected
     131             :  * one; return true when it happens.
     132             :  *
     133             :  * Returns false if the table sync worker or the table itself have
     134             :  * disappeared, or the table state has been reset.
     135             :  *
     136             :  * Currently, this is used in the apply worker when transitioning from
     137             :  * CATCHUP state to SYNCDONE.
     138             :  */
     139             : static bool
     140         360 : wait_for_table_state_change(Oid relid, char expected_state)
     141             : {
     142             :     char        state;
     143             : 
     144             :     for (;;)
     145         406 :     {
     146             :         LogicalRepWorker *worker;
     147             :         XLogRecPtr  statelsn;
     148             : 
     149         766 :         CHECK_FOR_INTERRUPTS();
     150             : 
     151         766 :         InvalidateCatalogSnapshot();
     152         766 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     153             :                                         relid, &statelsn);
     154             : 
     155         766 :         if (state == SUBREL_STATE_UNKNOWN)
     156           0 :             break;
     157             : 
     158         766 :         if (state == expected_state)
     159         360 :             return true;
     160             : 
     161             :         /* Check if the sync worker is still running and bail if not. */
     162         406 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     163         406 :         worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
     164         406 :                                         MyLogicalRepWorker->subid, relid,
     165             :                                         false);
     166         406 :         LWLockRelease(LogicalRepWorkerLock);
     167         406 :         if (!worker)
     168           0 :             break;
     169             : 
     170         406 :         (void) WaitLatch(MyLatch,
     171             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     172             :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     173             : 
     174         406 :         ResetLatch(MyLatch);
     175             :     }
     176             : 
     177           0 :     return false;
     178             : }
     179             : 
     180             : /*
     181             :  * Wait until the apply worker changes the state of our synchronization
     182             :  * worker to the expected one.
     183             :  *
     184             :  * Used when transitioning from SYNCWAIT state to CATCHUP.
     185             :  *
     186             :  * Returns false if the apply worker has disappeared.
     187             :  */
     188             : static bool
     189         372 : wait_for_worker_state_change(char expected_state)
     190             : {
     191             :     int         rc;
     192             : 
     193             :     for (;;)
     194         372 :     {
     195             :         LogicalRepWorker *worker;
     196             : 
     197         744 :         CHECK_FOR_INTERRUPTS();
     198             : 
     199             :         /*
     200             :          * Done if already in correct state.  (We assume this fetch is atomic
     201             :          * enough to not give a misleading answer if we do it with no lock.)
     202             :          */
     203         744 :         if (MyLogicalRepWorker->relstate == expected_state)
     204         372 :             return true;
     205             : 
     206             :         /*
     207             :          * Bail out if the apply worker has died, else signal it we're
     208             :          * waiting.
     209             :          */
     210         372 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     211         372 :         worker = logicalrep_worker_find(WORKERTYPE_APPLY,
     212         372 :                                         MyLogicalRepWorker->subid, InvalidOid,
     213             :                                         false);
     214         372 :         if (worker && worker->proc)
     215         372 :             logicalrep_worker_wakeup_ptr(worker);
     216         372 :         LWLockRelease(LogicalRepWorkerLock);
     217         372 :         if (!worker)
     218           0 :             break;
     219             : 
     220             :         /*
     221             :          * Wait.  We expect to get a latch signal back from the apply worker,
     222             :          * but use a timeout in case it dies without sending one.
     223             :          */
     224         372 :         rc = WaitLatch(MyLatch,
     225             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     226             :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     227             : 
     228         372 :         if (rc & WL_LATCH_SET)
     229         372 :             ResetLatch(MyLatch);
     230             :     }
     231             : 
     232           0 :     return false;
     233             : }
     234             : 
     235             : /*
     236             :  * Handle table synchronization cooperation from the synchronization
     237             :  * worker.
     238             :  *
     239             :  * If the sync worker is in CATCHUP state and reached (or passed) the
     240             :  * predetermined synchronization point in the WAL stream, mark the table as
     241             :  * SYNCDONE and finish.
     242             :  */
     243             : void
     244         454 : ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
     245             : {
     246         454 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     247             : 
     248         454 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
     249         454 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
     250             :     {
     251             :         TimeLineID  tli;
     252         372 :         char        syncslotname[NAMEDATALEN] = {0};
     253         372 :         char        originname[NAMEDATALEN] = {0};
     254             : 
     255         372 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
     256         372 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
     257             : 
     258         372 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     259             : 
     260             :         /*
     261             :          * UpdateSubscriptionRelState must be called within a transaction.
     262             :          */
     263         372 :         if (!IsTransactionState())
     264         372 :             StartTransactionCommand();
     265             : 
     266         372 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     267         372 :                                    MyLogicalRepWorker->relid,
     268         372 :                                    MyLogicalRepWorker->relstate,
     269         372 :                                    MyLogicalRepWorker->relstate_lsn,
     270             :                                    false);
     271             : 
     272             :         /*
     273             :          * End streaming so that LogRepWorkerWalRcvConn can be used to drop
     274             :          * the slot.
     275             :          */
     276         372 :         walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
     277             : 
     278             :         /*
     279             :          * Cleanup the tablesync slot.
     280             :          *
     281             :          * This has to be done after updating the state because otherwise if
     282             :          * there is an error while doing the database operations we won't be
     283             :          * able to rollback dropped slot.
     284             :          */
     285         372 :         ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
     286         372 :                                         MyLogicalRepWorker->relid,
     287             :                                         syncslotname,
     288             :                                         sizeof(syncslotname));
     289             : 
     290             :         /*
     291             :          * It is important to give an error if we are unable to drop the slot,
     292             :          * otherwise, it won't be dropped till the corresponding subscription
     293             :          * is dropped. So passing missing_ok = false.
     294             :          */
     295         372 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
     296             : 
     297         372 :         CommitTransactionCommand();
     298         372 :         pgstat_report_stat(false);
     299             : 
     300             :         /*
     301             :          * Start a new transaction to clean up the tablesync origin tracking.
     302             :          * This transaction will be ended within the FinishSyncWorker(). Now,
     303             :          * even, if we fail to remove this here, the apply worker will ensure
     304             :          * to clean it up afterward.
     305             :          *
     306             :          * We need to do this after the table state is set to SYNCDONE.
     307             :          * Otherwise, if an error occurs while performing the database
     308             :          * operation, the worker will be restarted and the in-memory state of
     309             :          * replication progress (remote_lsn) won't be rolled-back which would
     310             :          * have been cleared before restart. So, the restarted worker will use
     311             :          * invalid replication progress state resulting in replay of
     312             :          * transactions that have already been applied.
     313             :          */
     314         372 :         StartTransactionCommand();
     315             : 
     316         372 :         ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
     317         372 :                                            MyLogicalRepWorker->relid,
     318             :                                            originname,
     319             :                                            sizeof(originname));
     320             : 
     321             :         /*
     322             :          * Resetting the origin session removes the ownership of the slot.
     323             :          * This is needed to allow the origin to be dropped.
     324             :          */
     325         372 :         replorigin_session_reset();
     326         372 :         replorigin_xact_clear(true);
     327             : 
     328             :         /*
     329             :          * Drop the tablesync's origin tracking if exists.
     330             :          *
     331             :          * There is a chance that the user is concurrently performing refresh
     332             :          * for the subscription where we remove the table state and its origin
     333             :          * or the apply worker would have removed this origin. So passing
     334             :          * missing_ok = true.
     335             :          */
     336         372 :         replorigin_drop_by_name(originname, true, false);
     337             : 
     338         372 :         FinishSyncWorker();
     339             :     }
     340             :     else
     341          82 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     342          82 : }
     343             : 
     344             : /*
     345             :  * Handle table synchronization cooperation from the apply worker.
     346             :  *
     347             :  * Walk over all subscription tables that are individually tracked by the
     348             :  * apply process (currently, all that have state other than
     349             :  * SUBREL_STATE_READY) and manage synchronization for them.
     350             :  *
     351             :  * If there are tables that need synchronizing and are not being synchronized
     352             :  * yet, start sync workers for them (if there are free slots for sync
     353             :  * workers).  To prevent starting the sync worker for the same relation at a
     354             :  * high frequency after a failure, we store its last start time with each sync
     355             :  * state info.  We start the sync worker for the same relation after waiting
     356             :  * at least wal_retrieve_retry_interval.
     357             :  *
     358             :  * For tables that are being synchronized already, check if sync workers
     359             :  * either need action from the apply worker or have finished.  This is the
     360             :  * SYNCWAIT to CATCHUP transition.
     361             :  *
     362             :  * If the synchronization position is reached (SYNCDONE), then the table can
     363             :  * be marked as READY and is no longer tracked.
     364             :  */
     365             : void
     366       12020 : ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
     367             : {
     368             :     struct tablesync_start_time_mapping
     369             :     {
     370             :         Oid         relid;
     371             :         TimestampTz last_start_time;
     372             :     };
     373             :     static HTAB *last_start_times = NULL;
     374             :     ListCell   *lc;
     375             :     bool        started_tx;
     376       12020 :     bool        should_exit = false;
     377       12020 :     Relation    rel = NULL;
     378             : 
     379             :     Assert(!IsTransactionState());
     380             : 
     381             :     /* We need up-to-date sync state info for subscription tables here. */
     382       12020 :     FetchRelationStates(NULL, NULL, &started_tx);
     383             : 
     384             :     /*
     385             :      * Prepare a hash table for tracking last start times of workers, to avoid
     386             :      * immediate restarts.  We don't need it if there are no tables that need
     387             :      * syncing.
     388             :      */
     389       12020 :     if (table_states_not_ready != NIL && !last_start_times)
     390         240 :     {
     391             :         HASHCTL     ctl;
     392             : 
     393         240 :         ctl.keysize = sizeof(Oid);
     394         240 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
     395         240 :         last_start_times = hash_create("Logical replication table sync worker start times",
     396             :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
     397             :     }
     398             : 
     399             :     /*
     400             :      * Clean up the hash table when we're done with all tables (just to
     401             :      * release the bit of memory).
     402             :      */
     403       11780 :     else if (table_states_not_ready == NIL && last_start_times)
     404             :     {
     405         178 :         hash_destroy(last_start_times);
     406         178 :         last_start_times = NULL;
     407             :     }
     408             : 
     409             :     /*
     410             :      * Process all tables that are being synchronized.
     411             :      */
     412       15508 :     foreach(lc, table_states_not_ready)
     413             :     {
     414        3490 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
     415             : 
     416        3490 :         if (!started_tx)
     417             :         {
     418         596 :             StartTransactionCommand();
     419         596 :             started_tx = true;
     420             :         }
     421             : 
     422             :         Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
     423             : 
     424        3490 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
     425             :         {
     426             :             /*
     427             :              * Apply has caught up to the position where the table sync has
     428             :              * finished.  Mark the table as ready so that the apply will just
     429             :              * continue to replicate it normally.
     430             :              */
     431         358 :             if (current_lsn >= rstate->lsn)
     432             :             {
     433             :                 char        originname[NAMEDATALEN];
     434             : 
     435         354 :                 rstate->state = SUBREL_STATE_READY;
     436         354 :                 rstate->lsn = current_lsn;
     437             : 
     438             :                 /*
     439             :                  * Remove the tablesync origin tracking if exists.
     440             :                  *
     441             :                  * There is a chance that the user is concurrently performing
     442             :                  * refresh for the subscription where we remove the table
     443             :                  * state and its origin or the tablesync worker would have
     444             :                  * already removed this origin. We can't rely on tablesync
     445             :                  * worker to remove the origin tracking as if there is any
     446             :                  * error while dropping we won't restart it to drop the
     447             :                  * origin. So passing missing_ok = true.
     448             :                  *
     449             :                  * Lock the subscription and origin in the same order as we
     450             :                  * are doing during DDL commands to avoid deadlocks. See
     451             :                  * AlterSubscription_refresh.
     452             :                  */
     453         354 :                 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
     454             :                                  0, AccessShareLock);
     455             : 
     456         354 :                 if (!rel)
     457         354 :                     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     458             : 
     459         354 :                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
     460             :                                                    rstate->relid,
     461             :                                                    originname,
     462             :                                                    sizeof(originname));
     463         354 :                 replorigin_drop_by_name(originname, true, false);
     464             : 
     465             :                 /*
     466             :                  * Update the state to READY only after the origin cleanup.
     467             :                  */
     468         354 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     469         354 :                                            rstate->relid, rstate->state,
     470             :                                            rstate->lsn, true);
     471             :             }
     472             :         }
     473             :         else
     474             :         {
     475             :             LogicalRepWorker *syncworker;
     476             : 
     477             :             /*
     478             :              * Look for a sync worker for this relation.
     479             :              */
     480        3132 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     481             : 
     482        3132 :             syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
     483        3132 :                                                 MyLogicalRepWorker->subid,
     484             :                                                 rstate->relid, false);
     485             : 
     486        3132 :             if (syncworker)
     487             :             {
     488             :                 /* Found one, update our copy of its state */
     489        1390 :                 SpinLockAcquire(&syncworker->relmutex);
     490        1390 :                 rstate->state = syncworker->relstate;
     491        1390 :                 rstate->lsn = syncworker->relstate_lsn;
     492        1390 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     493             :                 {
     494             :                     /*
     495             :                      * Sync worker is waiting for apply.  Tell sync worker it
     496             :                      * can catchup now.
     497             :                      */
     498         360 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
     499         360 :                     syncworker->relstate_lsn =
     500         360 :                         Max(syncworker->relstate_lsn, current_lsn);
     501             :                 }
     502        1390 :                 SpinLockRelease(&syncworker->relmutex);
     503             : 
     504             :                 /* If we told worker to catch up, wait for it. */
     505        1390 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     506             :                 {
     507             :                     /* Signal the sync worker, as it may be waiting for us. */
     508         360 :                     if (syncworker->proc)
     509         360 :                         logicalrep_worker_wakeup_ptr(syncworker);
     510             : 
     511             :                     /* Now safe to release the LWLock */
     512         360 :                     LWLockRelease(LogicalRepWorkerLock);
     513             : 
     514         360 :                     if (started_tx)
     515             :                     {
     516             :                         /*
     517             :                          * We must commit the existing transaction to release
     518             :                          * the existing locks before entering a busy loop.
     519             :                          * This is required to avoid any undetected deadlocks
     520             :                          * due to any existing lock as deadlock detector won't
     521             :                          * be able to detect the waits on the latch.
     522             :                          *
     523             :                          * Also close any tables prior to the commit.
     524             :                          */
     525         360 :                         if (rel)
     526             :                         {
     527          54 :                             table_close(rel, NoLock);
     528          54 :                             rel = NULL;
     529             :                         }
     530         360 :                         CommitTransactionCommand();
     531         360 :                         pgstat_report_stat(false);
     532             :                     }
     533             : 
     534             :                     /*
     535             :                      * Enter busy loop and wait for synchronization worker to
     536             :                      * reach expected state (or die trying).
     537             :                      */
     538         360 :                     StartTransactionCommand();
     539         360 :                     started_tx = true;
     540             : 
     541         360 :                     wait_for_table_state_change(rstate->relid,
     542             :                                                 SUBREL_STATE_SYNCDONE);
     543             :                 }
     544             :                 else
     545        1030 :                     LWLockRelease(LogicalRepWorkerLock);
     546             :             }
     547             :             else
     548             :             {
     549             :                 /*
     550             :                  * If there is no sync worker for this table yet, count
     551             :                  * running sync workers for this subscription, while we have
     552             :                  * the lock.
     553             :                  */
     554             :                 int         nsyncworkers =
     555        1742 :                     logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     556             :                 struct tablesync_start_time_mapping *hentry;
     557             :                 bool        found;
     558             : 
     559             :                 /* Now safe to release the LWLock */
     560        1742 :                 LWLockRelease(LogicalRepWorkerLock);
     561             : 
     562        1742 :                 hentry = hash_search(last_start_times, &rstate->relid,
     563             :                                      HASH_ENTER, &found);
     564        1742 :                 if (!found)
     565         376 :                     hentry->last_start_time = 0;
     566             : 
     567        1742 :                 launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
     568             :                                    rstate->relid, &hentry->last_start_time);
     569             :             }
     570             :         }
     571             :     }
     572             : 
     573             :     /* Close table if opened */
     574       12018 :     if (rel)
     575         300 :         table_close(rel, NoLock);
     576             : 
     577             : 
     578       12018 :     if (started_tx)
     579             :     {
     580             :         /*
     581             :          * Even when the two_phase mode is requested by the user, it remains
     582             :          * as 'pending' until all tablesyncs have reached READY state.
     583             :          *
     584             :          * When this happens, we restart the apply worker and (if the
     585             :          * conditions are still ok) then the two_phase tri-state will become
     586             :          * 'enabled' at that time.
     587             :          *
     588             :          * Note: If the subscription has no tables then leave the state as
     589             :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
     590             :          * work.
     591             :          */
     592        2084 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
     593             :         {
     594          76 :             CommandCounterIncrement();  /* make updates visible */
     595          76 :             if (AllTablesyncsReady())
     596             :             {
     597          12 :                 ereport(LOG,
     598             :                         (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
     599             :                                 MySubscription->name)));
     600          12 :                 should_exit = true;
     601             :             }
     602             :         }
     603             : 
     604        2084 :         CommitTransactionCommand();
     605        2084 :         pgstat_report_stat(true);
     606             :     }
     607             : 
     608       12018 :     if (should_exit)
     609             :     {
     610             :         /*
     611             :          * Reset the last-start time for this worker so that the launcher will
     612             :          * restart it without waiting for wal_retrieve_retry_interval.
     613             :          */
     614          12 :         ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
     615             : 
     616          12 :         proc_exit(0);
     617             :     }
     618       12006 : }
     619             : 
     620             : /*
     621             :  * Create list of columns for COPY based on logical relation mapping.
     622             :  */
     623             : static List *
     624         394 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
     625             : {
     626         394 :     List       *attnamelist = NIL;
     627             :     int         i;
     628             : 
     629        1052 :     for (i = 0; i < rel->remoterel.natts; i++)
     630             :     {
     631         658 :         attnamelist = lappend(attnamelist,
     632         658 :                               makeString(rel->remoterel.attnames[i]));
     633             :     }
     634             : 
     635             : 
     636         394 :     return attnamelist;
     637             : }
     638             : 
     639             : /*
     640             :  * Data source callback for the COPY FROM, which reads from the remote
     641             :  * connection and passes the data back to our local COPY.
     642             :  */
     643             : static int
     644       28008 : copy_read_data(void *outbuf, int minread, int maxread)
     645             : {
     646       28008 :     int         bytesread = 0;
     647             :     int         avail;
     648             : 
     649             :     /* If there are some leftover data from previous read, use it. */
     650       28008 :     avail = copybuf->len - copybuf->cursor;
     651       28008 :     if (avail)
     652             :     {
     653           0 :         if (avail > maxread)
     654           0 :             avail = maxread;
     655           0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     656           0 :         copybuf->cursor += avail;
     657           0 :         maxread -= avail;
     658           0 :         bytesread += avail;
     659             :     }
     660             : 
     661       28038 :     while (maxread > 0 && bytesread < minread)
     662             :     {
     663       28038 :         pgsocket    fd = PGINVALID_SOCKET;
     664             :         int         len;
     665       28038 :         char       *buf = NULL;
     666             : 
     667             :         for (;;)
     668             :         {
     669             :             /* Try read the data. */
     670       28038 :             len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
     671             : 
     672       28038 :             CHECK_FOR_INTERRUPTS();
     673             : 
     674       28038 :             if (len == 0)
     675          30 :                 break;
     676       28008 :             else if (len < 0)
     677       28008 :                 return bytesread;
     678             :             else
     679             :             {
     680             :                 /* Process the data */
     681       27618 :                 copybuf->data = buf;
     682       27618 :                 copybuf->len = len;
     683       27618 :                 copybuf->cursor = 0;
     684             : 
     685       27618 :                 avail = copybuf->len - copybuf->cursor;
     686       27618 :                 if (avail > maxread)
     687           0 :                     avail = maxread;
     688       27618 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     689       27618 :                 outbuf = (char *) outbuf + avail;
     690       27618 :                 copybuf->cursor += avail;
     691       27618 :                 maxread -= avail;
     692       27618 :                 bytesread += avail;
     693             :             }
     694             : 
     695       27618 :             if (maxread <= 0 || bytesread >= minread)
     696       27618 :                 return bytesread;
     697             :         }
     698             : 
     699             :         /*
     700             :          * Wait for more data or latch.
     701             :          */
     702          30 :         (void) WaitLatchOrSocket(MyLatch,
     703             :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
     704             :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     705             :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
     706             : 
     707          30 :         ResetLatch(MyLatch);
     708             :     }
     709             : 
     710           0 :     return bytesread;
     711             : }
     712             : 
     713             : 
     714             : /*
     715             :  * Get information about remote relation in similar fashion the RELATION
     716             :  * message provides during replication.
     717             :  *
     718             :  * This function also returns (a) the relation qualifications to be used in
     719             :  * the COPY command, and (b) whether the remote relation has published any
     720             :  * generated column.
     721             :  */
     722             : static void
     723         398 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
     724             :                         List **qual, bool *gencol_published)
     725             : {
     726             :     WalRcvExecResult *res;
     727             :     StringInfoData cmd;
     728             :     TupleTableSlot *slot;
     729         398 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
     730         398 :     Oid         attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
     731         398 :     Oid         qualRow[] = {TEXTOID};
     732             :     bool        isnull;
     733             :     int         natt;
     734         398 :     StringInfo  pub_names = NULL;
     735         398 :     Bitmapset  *included_cols = NULL;
     736         398 :     int         server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
     737             : 
     738         398 :     lrel->nspname = nspname;
     739         398 :     lrel->relname = relname;
     740             : 
     741             :     /* First fetch Oid and replica identity. */
     742         398 :     initStringInfo(&cmd);
     743         398 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
     744             :                      "  FROM pg_catalog.pg_class c"
     745             :                      "  INNER JOIN pg_catalog.pg_namespace n"
     746             :                      "        ON (c.relnamespace = n.oid)"
     747             :                      " WHERE n.nspname = %s"
     748             :                      "   AND c.relname = %s",
     749             :                      quote_literal_cstr(nspname),
     750             :                      quote_literal_cstr(relname));
     751         398 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     752             :                       lengthof(tableRow), tableRow);
     753             : 
     754         398 :     if (res->status != WALRCV_OK_TUPLES)
     755           0 :         ereport(ERROR,
     756             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     757             :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     758             :                         nspname, relname, res->err)));
     759             : 
     760         398 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     761         398 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     762           0 :         ereport(ERROR,
     763             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     764             :                  errmsg("table \"%s.%s\" not found on publisher",
     765             :                         nspname, relname)));
     766             : 
     767         398 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
     768             :     Assert(!isnull);
     769         398 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
     770             :     Assert(!isnull);
     771         398 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
     772             :     Assert(!isnull);
     773             : 
     774         398 :     ExecDropSingleTupleTableSlot(slot);
     775         398 :     walrcv_clear_result(res);
     776             : 
     777             : 
     778             :     /*
     779             :      * Get column lists for each relation.
     780             :      *
     781             :      * We need to do this before fetching info about column names and types,
     782             :      * so that we can skip columns that should not be replicated.
     783             :      */
     784         398 :     if (server_version >= 150000)
     785             :     {
     786             :         WalRcvExecResult *pubres;
     787             :         TupleTableSlot *tslot;
     788         398 :         Oid         attrsRow[] = {INT2VECTOROID};
     789             : 
     790             :         /* Build the pub_names comma-separated string. */
     791         398 :         pub_names = makeStringInfo();
     792         398 :         GetPublicationsStr(MySubscription->publications, pub_names, true);
     793             : 
     794             :         /*
     795             :          * Fetch info about column lists for the relation (from all the
     796             :          * publications).
     797             :          */
     798         398 :         resetStringInfo(&cmd);
     799         398 :         appendStringInfo(&cmd,
     800             :                          "SELECT DISTINCT"
     801             :                          "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
     802             :                          "   THEN NULL ELSE gpt.attrs END)"
     803             :                          "  FROM pg_publication p,"
     804             :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
     805             :                          "  pg_class c"
     806             :                          " WHERE gpt.relid = %u AND c.oid = gpt.relid"
     807             :                          "   AND p.pubname IN ( %s )",
     808             :                          lrel->remoteid,
     809             :                          pub_names->data);
     810             : 
     811         398 :         pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     812             :                              lengthof(attrsRow), attrsRow);
     813             : 
     814         398 :         if (pubres->status != WALRCV_OK_TUPLES)
     815           0 :             ereport(ERROR,
     816             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     817             :                      errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
     818             :                             nspname, relname, pubres->err)));
     819             : 
     820             :         /*
     821             :          * We don't support the case where the column list is different for
     822             :          * the same table when combining publications. See comments atop
     823             :          * fetch_relation_list. So there should be only one row returned.
     824             :          * Although we already checked this when creating the subscription, we
     825             :          * still need to check here in case the column list was changed after
     826             :          * creating the subscription and before the sync worker is started.
     827             :          */
     828         398 :         if (tuplestore_tuple_count(pubres->tuplestore) > 1)
     829           0 :             ereport(ERROR,
     830             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     831             :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
     832             :                            nspname, relname));
     833             : 
     834             :         /*
     835             :          * Get the column list and build a single bitmap with the attnums.
     836             :          *
     837             :          * If we find a NULL value, it means all the columns should be
     838             :          * replicated.
     839             :          */
     840         398 :         tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
     841         398 :         if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
     842             :         {
     843         398 :             Datum       cfval = slot_getattr(tslot, 1, &isnull);
     844             : 
     845         398 :             if (!isnull)
     846             :             {
     847             :                 ArrayType  *arr;
     848             :                 int         nelems;
     849             :                 int16      *elems;
     850             : 
     851          44 :                 arr = DatumGetArrayTypeP(cfval);
     852          44 :                 nelems = ARR_DIMS(arr)[0];
     853          44 :                 elems = (int16 *) ARR_DATA_PTR(arr);
     854             : 
     855         118 :                 for (natt = 0; natt < nelems; natt++)
     856          74 :                     included_cols = bms_add_member(included_cols, elems[natt]);
     857             :             }
     858             : 
     859         398 :             ExecClearTuple(tslot);
     860             :         }
     861         398 :         ExecDropSingleTupleTableSlot(tslot);
     862             : 
     863         398 :         walrcv_clear_result(pubres);
     864             :     }
     865             : 
     866             :     /*
     867             :      * Now fetch column names and types.
     868             :      */
     869         398 :     resetStringInfo(&cmd);
     870         398 :     appendStringInfoString(&cmd,
     871             :                            "SELECT a.attnum,"
     872             :                            "       a.attname,"
     873             :                            "       a.atttypid,"
     874             :                            "       a.attnum = ANY(i.indkey)");
     875             : 
     876             :     /* Generated columns can be replicated since version 18. */
     877         398 :     if (server_version >= 180000)
     878         398 :         appendStringInfoString(&cmd, ", a.attgenerated != ''");
     879             : 
     880         796 :     appendStringInfo(&cmd,
     881             :                      "  FROM pg_catalog.pg_attribute a"
     882             :                      "  LEFT JOIN pg_catalog.pg_index i"
     883             :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
     884             :                      " WHERE a.attnum > 0::pg_catalog.int2"
     885             :                      "   AND NOT a.attisdropped %s"
     886             :                      "   AND a.attrelid = %u"
     887             :                      " ORDER BY a.attnum",
     888             :                      lrel->remoteid,
     889         398 :                      (server_version >= 120000 && server_version < 180000 ?
     890             :                       "AND a.attgenerated = ''" : ""),
     891             :                      lrel->remoteid);
     892         398 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     893             :                       server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
     894             : 
     895         398 :     if (res->status != WALRCV_OK_TUPLES)
     896           0 :         ereport(ERROR,
     897             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     898             :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     899             :                         nspname, relname, res->err)));
     900             : 
     901             :     /* We don't know the number of rows coming, so allocate enough space. */
     902         398 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
     903         398 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
     904         398 :     lrel->attkeys = NULL;
     905             : 
     906             :     /*
     907             :      * Store the columns as a list of names.  Ignore those that are not
     908             :      * present in the column list, if there is one.
     909             :      */
     910         398 :     natt = 0;
     911         398 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     912        1130 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     913             :     {
     914             :         char       *rel_colname;
     915             :         AttrNumber  attnum;
     916             : 
     917         732 :         attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
     918             :         Assert(!isnull);
     919             : 
     920             :         /* If the column is not in the column list, skip it. */
     921         732 :         if (included_cols != NULL && !bms_is_member(attnum, included_cols))
     922             :         {
     923          62 :             ExecClearTuple(slot);
     924          62 :             continue;
     925             :         }
     926             : 
     927         670 :         rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
     928             :         Assert(!isnull);
     929             : 
     930         670 :         lrel->attnames[natt] = rel_colname;
     931         670 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
     932             :         Assert(!isnull);
     933             : 
     934         670 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
     935         220 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
     936             : 
     937             :         /* Remember if the remote table has published any generated column. */
     938         670 :         if (server_version >= 180000 && !(*gencol_published))
     939             :         {
     940         670 :             *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
     941             :             Assert(!isnull);
     942             :         }
     943             : 
     944             :         /* Should never happen. */
     945         670 :         if (++natt >= MaxTupleAttributeNumber)
     946           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
     947             :                  nspname, relname);
     948             : 
     949         670 :         ExecClearTuple(slot);
     950             :     }
     951         398 :     ExecDropSingleTupleTableSlot(slot);
     952             : 
     953         398 :     lrel->natts = natt;
     954             : 
     955         398 :     walrcv_clear_result(res);
     956             : 
     957             :     /*
     958             :      * Get relation's row filter expressions. DISTINCT avoids the same
     959             :      * expression of a table in multiple publications from being included
     960             :      * multiple times in the final expression.
     961             :      *
     962             :      * We need to copy the row even if it matches just one of the
     963             :      * publications, so we later combine all the quals with OR.
     964             :      *
     965             :      * For initial synchronization, row filtering can be ignored in following
     966             :      * cases:
     967             :      *
     968             :      * 1) one of the subscribed publications for the table hasn't specified
     969             :      * any row filter
     970             :      *
     971             :      * 2) one of the subscribed publications has puballtables set to true
     972             :      *
     973             :      * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
     974             :      * that includes this relation
     975             :      */
     976         398 :     if (server_version >= 150000)
     977             :     {
     978             :         /* Reuse the already-built pub_names. */
     979             :         Assert(pub_names != NULL);
     980             : 
     981             :         /* Check for row filters. */
     982         398 :         resetStringInfo(&cmd);
     983         398 :         appendStringInfo(&cmd,
     984             :                          "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
     985             :                          "  FROM pg_publication p,"
     986             :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt"
     987             :                          " WHERE gpt.relid = %u"
     988             :                          "   AND p.pubname IN ( %s )",
     989             :                          lrel->remoteid,
     990             :                          pub_names->data);
     991             : 
     992         398 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
     993             : 
     994         398 :         if (res->status != WALRCV_OK_TUPLES)
     995           0 :             ereport(ERROR,
     996             :                     (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
     997             :                             nspname, relname, res->err)));
     998             : 
     999             :         /*
    1000             :          * Multiple row filter expressions for the same table will be combined
    1001             :          * by COPY using OR. If any of the filter expressions for this table
    1002             :          * are null, it means the whole table will be copied. In this case it
    1003             :          * is not necessary to construct a unified row filter expression at
    1004             :          * all.
    1005             :          */
    1006         398 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    1007         428 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    1008             :         {
    1009         406 :             Datum       rf = slot_getattr(slot, 1, &isnull);
    1010             : 
    1011         406 :             if (!isnull)
    1012          30 :                 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
    1013             :             else
    1014             :             {
    1015             :                 /* Ignore filters and cleanup as necessary. */
    1016         376 :                 if (*qual)
    1017             :                 {
    1018           6 :                     list_free_deep(*qual);
    1019           6 :                     *qual = NIL;
    1020             :                 }
    1021         376 :                 break;
    1022             :             }
    1023             : 
    1024          30 :             ExecClearTuple(slot);
    1025             :         }
    1026         398 :         ExecDropSingleTupleTableSlot(slot);
    1027             : 
    1028         398 :         walrcv_clear_result(res);
    1029         398 :         destroyStringInfo(pub_names);
    1030             :     }
    1031             : 
    1032         398 :     pfree(cmd.data);
    1033         398 : }
    1034             : 
    1035             : /*
    1036             :  * Copy existing data of a table from publisher.
    1037             :  *
    1038             :  * Caller is responsible for locking the local relation.
    1039             :  */
    1040             : static void
    1041         398 : copy_table(Relation rel)
    1042             : {
    1043             :     LogicalRepRelMapEntry *relmapentry;
    1044             :     LogicalRepRelation lrel;
    1045         398 :     List       *qual = NIL;
    1046             :     WalRcvExecResult *res;
    1047             :     StringInfoData cmd;
    1048             :     CopyFromState cstate;
    1049             :     List       *attnamelist;
    1050             :     ParseState *pstate;
    1051         398 :     List       *options = NIL;
    1052         398 :     bool        gencol_published = false;
    1053             : 
    1054             :     /* Get the publisher relation info. */
    1055         398 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
    1056         398 :                             RelationGetRelationName(rel), &lrel, &qual,
    1057             :                             &gencol_published);
    1058             : 
    1059             :     /* Put the relation into relmap. */
    1060         398 :     logicalrep_relmap_update(&lrel);
    1061             : 
    1062             :     /* Map the publisher relation to local one. */
    1063         398 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
    1064             :     Assert(rel == relmapentry->localrel);
    1065             : 
    1066             :     /* Start copy on the publisher. */
    1067         394 :     initStringInfo(&cmd);
    1068             : 
    1069             :     /* Regular or partitioned table with no row filter or generated columns */
    1070         394 :     if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
    1071         394 :         && qual == NIL && !gencol_published)
    1072             :     {
    1073         366 :         appendStringInfo(&cmd, "COPY %s",
    1074         366 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
    1075             : 
    1076             :         /* If the table has columns, then specify the columns */
    1077         366 :         if (lrel.natts)
    1078             :         {
    1079         364 :             appendStringInfoString(&cmd, " (");
    1080             : 
    1081             :             /*
    1082             :              * XXX Do we need to list the columns in all cases? Maybe we're
    1083             :              * replicating all columns?
    1084             :              */
    1085         980 :             for (int i = 0; i < lrel.natts; i++)
    1086             :             {
    1087         616 :                 if (i > 0)
    1088         252 :                     appendStringInfoString(&cmd, ", ");
    1089             : 
    1090         616 :                 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
    1091             :             }
    1092             : 
    1093         364 :             appendStringInfoChar(&cmd, ')');
    1094             :         }
    1095             : 
    1096         366 :         appendStringInfoString(&cmd, " TO STDOUT");
    1097             :     }
    1098             :     else
    1099             :     {
    1100             :         /*
    1101             :          * For non-tables and tables with row filters, we need to do COPY
    1102             :          * (SELECT ...), but we can't just do SELECT * because we may need to
    1103             :          * copy only subset of columns including generated columns. For tables
    1104             :          * with any row filters, build a SELECT query with OR'ed row filters
    1105             :          * for COPY.
    1106             :          *
    1107             :          * We also need to use this same COPY (SELECT ...) syntax when
    1108             :          * generated columns are published, because copy of generated columns
    1109             :          * is not supported by the normal COPY.
    1110             :          */
    1111          28 :         appendStringInfoString(&cmd, "COPY (SELECT ");
    1112          70 :         for (int i = 0; i < lrel.natts; i++)
    1113             :         {
    1114          42 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
    1115          42 :             if (i < lrel.natts - 1)
    1116          14 :                 appendStringInfoString(&cmd, ", ");
    1117             :         }
    1118             : 
    1119          28 :         appendStringInfoString(&cmd, " FROM ");
    1120             : 
    1121             :         /*
    1122             :          * For regular tables, make sure we don't copy data from a child that
    1123             :          * inherits the named table as those will be copied separately.
    1124             :          */
    1125          28 :         if (lrel.relkind == RELKIND_RELATION)
    1126          22 :             appendStringInfoString(&cmd, "ONLY ");
    1127             : 
    1128          28 :         appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
    1129             :         /* list of OR'ed filters */
    1130          28 :         if (qual != NIL)
    1131             :         {
    1132             :             ListCell   *lc;
    1133          22 :             char       *q = strVal(linitial(qual));
    1134             : 
    1135          22 :             appendStringInfo(&cmd, " WHERE %s", q);
    1136          24 :             for_each_from(lc, qual, 1)
    1137             :             {
    1138           2 :                 q = strVal(lfirst(lc));
    1139           2 :                 appendStringInfo(&cmd, " OR %s", q);
    1140             :             }
    1141          22 :             list_free_deep(qual);
    1142             :         }
    1143             : 
    1144          28 :         appendStringInfoString(&cmd, ") TO STDOUT");
    1145             :     }
    1146             : 
    1147             :     /*
    1148             :      * Prior to v16, initial table synchronization will use text format even
    1149             :      * if the binary option is enabled for a subscription.
    1150             :      */
    1151         394 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
    1152         394 :         MySubscription->binary)
    1153             :     {
    1154          10 :         appendStringInfoString(&cmd, " WITH (FORMAT binary)");
    1155          10 :         options = list_make1(makeDefElem("format",
    1156             :                                          (Node *) makeString("binary"), -1));
    1157             :     }
    1158             : 
    1159         394 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
    1160         394 :     pfree(cmd.data);
    1161         394 :     if (res->status != WALRCV_OK_COPY_OUT)
    1162           0 :         ereport(ERROR,
    1163             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1164             :                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
    1165             :                         lrel.nspname, lrel.relname, res->err)));
    1166         394 :     walrcv_clear_result(res);
    1167             : 
    1168         394 :     copybuf = makeStringInfo();
    1169             : 
    1170         394 :     pstate = make_parsestate(NULL);
    1171         394 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
    1172             :                                          NULL, false, false);
    1173             : 
    1174         394 :     attnamelist = make_copy_attnamelist(relmapentry);
    1175         394 :     cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
    1176             : 
    1177             :     /* Do the copy */
    1178         392 :     (void) CopyFrom(cstate);
    1179             : 
    1180         372 :     logicalrep_rel_close(relmapentry, NoLock);
    1181         372 : }
    1182             : 
    1183             : /*
    1184             :  * Determine the tablesync slot name.
    1185             :  *
    1186             :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
    1187             :  * on slot name length. We append system_identifier to avoid slot_name
    1188             :  * collision with subscriptions in other clusters. With the current scheme
    1189             :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
    1190             :  * length of slot_name will be 50.
    1191             :  *
    1192             :  * The returned slot name is stored in the supplied buffer (syncslotname) with
    1193             :  * the given size.
    1194             :  *
    1195             :  * Note: We don't use the subscription slot name as part of tablesync slot name
    1196             :  * because we are responsible for cleaning up these slots and it could become
    1197             :  * impossible to recalculate what name to cleanup if the subscription slot name
    1198             :  * had changed.
    1199             :  */
    1200             : void
    1201         780 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
    1202             :                                 char *syncslotname, Size szslot)
    1203             : {
    1204         780 :     snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
    1205             :              relid, GetSystemIdentifier());
    1206         780 : }
    1207             : 
    1208             : /*
    1209             :  * Start syncing the table in the sync worker.
    1210             :  *
    1211             :  * If nothing needs to be done to sync the table, we exit the worker without
    1212             :  * any further action.
    1213             :  *
    1214             :  * The returned slot name is palloc'ed in current memory context.
    1215             :  */
    1216             : static char *
    1217         398 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    1218             : {
    1219             :     char       *slotname;
    1220             :     char       *err;
    1221             :     char        relstate;
    1222             :     XLogRecPtr  relstate_lsn;
    1223             :     Relation    rel;
    1224             :     AclResult   aclresult;
    1225             :     WalRcvExecResult *res;
    1226             :     char        originname[NAMEDATALEN];
    1227             :     ReplOriginId originid;
    1228             :     UserContext ucxt;
    1229             :     bool        must_use_password;
    1230             :     bool        run_as_owner;
    1231             : 
    1232             :     /* Check the state of the table synchronization. */
    1233         398 :     StartTransactionCommand();
    1234         398 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
    1235         398 :                                        MyLogicalRepWorker->relid,
    1236             :                                        &relstate_lsn);
    1237         398 :     CommitTransactionCommand();
    1238             : 
    1239             :     /* Is the use of a password mandatory? */
    1240         788 :     must_use_password = MySubscription->passwordrequired &&
    1241         390 :         !MySubscription->ownersuperuser;
    1242             : 
    1243         398 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1244         398 :     MyLogicalRepWorker->relstate = relstate;
    1245         398 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
    1246         398 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1247             : 
    1248             :     /*
    1249             :      * If synchronization is already done or no longer necessary, exit now
    1250             :      * that we've updated shared memory state.
    1251             :      */
    1252         398 :     switch (relstate)
    1253             :     {
    1254           0 :         case SUBREL_STATE_SYNCDONE:
    1255             :         case SUBREL_STATE_READY:
    1256             :         case SUBREL_STATE_UNKNOWN:
    1257           0 :             FinishSyncWorker(); /* doesn't return */
    1258             :     }
    1259             : 
    1260             :     /* Calculate the name of the tablesync slot. */
    1261         398 :     slotname = (char *) palloc(NAMEDATALEN);
    1262         398 :     ReplicationSlotNameForTablesync(MySubscription->oid,
    1263         398 :                                     MyLogicalRepWorker->relid,
    1264             :                                     slotname,
    1265             :                                     NAMEDATALEN);
    1266             : 
    1267             :     /*
    1268             :      * Here we use the slot name instead of the subscription name as the
    1269             :      * application_name, so that it is different from the leader apply worker,
    1270             :      * so that synchronous replication can distinguish them.
    1271             :      */
    1272         398 :     LogRepWorkerWalRcvConn =
    1273         398 :         walrcv_connect(MySubscription->conninfo, true, true,
    1274             :                        must_use_password,
    1275             :                        slotname, &err);
    1276         398 :     if (LogRepWorkerWalRcvConn == NULL)
    1277           0 :         ereport(ERROR,
    1278             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1279             :                  errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
    1280             :                         MySubscription->name, err)));
    1281             : 
    1282             :     Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
    1283             :            MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
    1284             :            MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
    1285             : 
    1286             :     /* Assign the origin tracking record name. */
    1287         398 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
    1288         398 :                                        MyLogicalRepWorker->relid,
    1289             :                                        originname,
    1290             :                                        sizeof(originname));
    1291             : 
    1292         398 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
    1293             :     {
    1294             :         /*
    1295             :          * We have previously errored out before finishing the copy so the
    1296             :          * replication slot might exist. We want to remove the slot if it
    1297             :          * already exists and proceed.
    1298             :          *
    1299             :          * XXX We could also instead try to drop the slot, last time we failed
    1300             :          * but for that, we might need to clean up the copy state as it might
    1301             :          * be in the middle of fetching the rows. Also, if there is a network
    1302             :          * breakdown then it wouldn't have succeeded so trying it next time
    1303             :          * seems like a better bet.
    1304             :          */
    1305          18 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
    1306             :     }
    1307         380 :     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
    1308             :     {
    1309             :         /*
    1310             :          * The COPY phase was previously done, but tablesync then crashed
    1311             :          * before it was able to finish normally.
    1312             :          */
    1313           0 :         StartTransactionCommand();
    1314             : 
    1315             :         /*
    1316             :          * The origin tracking name must already exist. It was created first
    1317             :          * time this tablesync was launched.
    1318             :          */
    1319           0 :         originid = replorigin_by_name(originname, false);
    1320           0 :         replorigin_session_setup(originid, 0);
    1321           0 :         replorigin_xact_state.origin = originid;
    1322           0 :         *origin_startpos = replorigin_session_get_progress(false);
    1323             : 
    1324           0 :         CommitTransactionCommand();
    1325             : 
    1326           0 :         goto copy_table_done;
    1327             :     }
    1328             : 
    1329         398 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1330         398 :     MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
    1331         398 :     MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
    1332         398 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1333             : 
    1334             :     /*
    1335             :      * Update the state, create the replication origin, and make them visible
    1336             :      * to others.
    1337             :      */
    1338         398 :     StartTransactionCommand();
    1339         398 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1340         398 :                                MyLogicalRepWorker->relid,
    1341         398 :                                MyLogicalRepWorker->relstate,
    1342         398 :                                MyLogicalRepWorker->relstate_lsn,
    1343             :                                false);
    1344             : 
    1345             :     /*
    1346             :      * Create the replication origin in a separate transaction from the one
    1347             :      * that sets up the origin in shared memory. This prevents the risk that
    1348             :      * changes to the origin in shared memory cannot be rolled back if the
    1349             :      * transaction aborts.
    1350             :      */
    1351         398 :     originid = replorigin_by_name(originname, true);
    1352         398 :     if (!OidIsValid(originid))
    1353         380 :         originid = replorigin_create(originname);
    1354             : 
    1355         398 :     CommitTransactionCommand();
    1356         398 :     pgstat_report_stat(true);
    1357             : 
    1358         398 :     StartTransactionCommand();
    1359             : 
    1360             :     /*
    1361             :      * Use a standard write lock here. It might be better to disallow access
    1362             :      * to the table while it's being synchronized. But we don't want to block
    1363             :      * the main apply process from working and it has to open the relation in
    1364             :      * RowExclusiveLock when remapping remote relation id to local one.
    1365             :      */
    1366         398 :     rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
    1367             : 
    1368             :     /*
    1369             :      * Start a transaction in the remote node in REPEATABLE READ mode.  This
    1370             :      * ensures that both the replication slot we create (see below) and the
    1371             :      * COPY are consistent with each other.
    1372             :      */
    1373         398 :     res = walrcv_exec(LogRepWorkerWalRcvConn,
    1374             :                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
    1375             :                       0, NULL);
    1376         398 :     if (res->status != WALRCV_OK_COMMAND)
    1377           0 :         ereport(ERROR,
    1378             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1379             :                  errmsg("table copy could not start transaction on publisher: %s",
    1380             :                         res->err)));
    1381         398 :     walrcv_clear_result(res);
    1382             : 
    1383             :     /*
    1384             :      * Create a new permanent logical decoding slot. This slot will be used
    1385             :      * for the catchup phase after COPY is done, so tell it to use the
    1386             :      * snapshot to make the final data consistent.
    1387             :      */
    1388         398 :     walrcv_create_slot(LogRepWorkerWalRcvConn,
    1389             :                        slotname, false /* permanent */ , false /* two_phase */ ,
    1390             :                        MySubscription->failover,
    1391             :                        CRS_USE_SNAPSHOT, origin_startpos);
    1392             : 
    1393             :     /*
    1394             :      * Advance the origin to the LSN got from walrcv_create_slot and then set
    1395             :      * up the origin. The advancement is WAL logged for the purpose of
    1396             :      * recovery. Locks are to prevent the replication origin from vanishing
    1397             :      * while advancing.
    1398             :      *
    1399             :      * The purpose of doing these before the copy is to avoid doing the copy
    1400             :      * again due to any error in advancing or setting up origin tracking.
    1401             :      */
    1402         398 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1403         398 :     replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
    1404             :                        true /* go backward */ , true /* WAL log */ );
    1405         398 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1406             : 
    1407         398 :     replorigin_session_setup(originid, 0);
    1408         398 :     replorigin_xact_state.origin = originid;
    1409             : 
    1410             :     /*
    1411             :      * If the user did not opt to run as the owner of the subscription
    1412             :      * ('run_as_owner'), then copy the table as the owner of the table.
    1413             :      */
    1414         398 :     run_as_owner = MySubscription->runasowner;
    1415         398 :     if (!run_as_owner)
    1416         396 :         SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
    1417             : 
    1418             :     /*
    1419             :      * Check that our table sync worker has permission to insert into the
    1420             :      * target table.
    1421             :      */
    1422         398 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    1423             :                                   ACL_INSERT);
    1424         398 :     if (aclresult != ACLCHECK_OK)
    1425           0 :         aclcheck_error(aclresult,
    1426           0 :                        get_relkind_objtype(rel->rd_rel->relkind),
    1427           0 :                        RelationGetRelationName(rel));
    1428             : 
    1429             :     /*
    1430             :      * COPY FROM does not honor RLS policies.  That is not a problem for
    1431             :      * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
    1432             :      * who has it implicitly), but other roles should not be able to
    1433             :      * circumvent RLS.  Disallow logical replication into RLS enabled
    1434             :      * relations for such roles.
    1435             :      */
    1436         398 :     if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
    1437           0 :         ereport(ERROR,
    1438             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1439             :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
    1440             :                         GetUserNameFromId(GetUserId(), true),
    1441             :                         RelationGetRelationName(rel))));
    1442             : 
    1443             :     /* Now do the initial data copy */
    1444         398 :     PushActiveSnapshot(GetTransactionSnapshot());
    1445         398 :     copy_table(rel);
    1446         372 :     PopActiveSnapshot();
    1447             : 
    1448         372 :     res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
    1449         372 :     if (res->status != WALRCV_OK_COMMAND)
    1450           0 :         ereport(ERROR,
    1451             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1452             :                  errmsg("table copy could not finish transaction on publisher: %s",
    1453             :                         res->err)));
    1454         372 :     walrcv_clear_result(res);
    1455             : 
    1456         372 :     if (!run_as_owner)
    1457         370 :         RestoreUserContext(&ucxt);
    1458             : 
    1459         372 :     table_close(rel, NoLock);
    1460             : 
    1461             :     /* Make the copy visible. */
    1462         372 :     CommandCounterIncrement();
    1463             : 
    1464             :     /*
    1465             :      * Update the persisted state to indicate the COPY phase is done; make it
    1466             :      * visible to others.
    1467             :      */
    1468         372 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1469         372 :                                MyLogicalRepWorker->relid,
    1470             :                                SUBREL_STATE_FINISHEDCOPY,
    1471         372 :                                MyLogicalRepWorker->relstate_lsn,
    1472             :                                false);
    1473             : 
    1474         372 :     CommitTransactionCommand();
    1475             : 
    1476         372 : copy_table_done:
    1477             : 
    1478         372 :     elog(DEBUG1,
    1479             :          "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
    1480             :          originname, LSN_FORMAT_ARGS(*origin_startpos));
    1481             : 
    1482             :     /*
    1483             :      * We are done with the initial data synchronization, update the state.
    1484             :      */
    1485         372 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1486         372 :     MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
    1487         372 :     MyLogicalRepWorker->relstate_lsn = *origin_startpos;
    1488         372 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1489             : 
    1490             :     /*
    1491             :      * Finally, wait until the leader apply worker tells us to catch up and
    1492             :      * then return to let LogicalRepApplyLoop do it.
    1493             :      */
    1494         372 :     wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
    1495         372 :     return slotname;
    1496             : }
    1497             : 
    1498             : /*
    1499             :  * Execute the initial sync with error handling. Disable the subscription,
    1500             :  * if it's required.
    1501             :  *
    1502             :  * Allocate the slot name in long-lived context on return. Note that we don't
    1503             :  * handle FATAL errors which are probably because of system resource error and
    1504             :  * are not repeatable.
    1505             :  */
    1506             : static void
    1507         398 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
    1508             : {
    1509         398 :     char       *sync_slotname = NULL;
    1510             : 
    1511             :     Assert(am_tablesync_worker());
    1512             : 
    1513         398 :     PG_TRY();
    1514             :     {
    1515             :         /* Call initial sync. */
    1516         398 :         sync_slotname = LogicalRepSyncTableStart(origin_startpos);
    1517             :     }
    1518          26 :     PG_CATCH();
    1519             :     {
    1520          26 :         if (MySubscription->disableonerr)
    1521           2 :             DisableSubscriptionAndExit();
    1522             :         else
    1523             :         {
    1524             :             /*
    1525             :              * Report the worker failed during table synchronization. Abort
    1526             :              * the current transaction so that the stats message is sent in an
    1527             :              * idle state.
    1528             :              */
    1529          24 :             AbortOutOfAnyTransaction();
    1530          24 :             pgstat_report_subscription_error(MySubscription->oid,
    1531             :                                              WORKERTYPE_TABLESYNC);
    1532             : 
    1533          24 :             PG_RE_THROW();
    1534             :         }
    1535             :     }
    1536         372 :     PG_END_TRY();
    1537             : 
    1538             :     /* allocate slot name in long-lived context */
    1539         372 :     *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
    1540         372 :     pfree(sync_slotname);
    1541         372 : }
    1542             : 
    1543             : /*
    1544             :  * Runs the tablesync worker.
    1545             :  *
    1546             :  * It starts syncing tables. After a successful sync, sets streaming options
    1547             :  * and starts streaming to catchup with apply worker.
    1548             :  */
    1549             : static void
    1550         398 : run_tablesync_worker(void)
    1551             : {
    1552             :     char        originname[NAMEDATALEN];
    1553         398 :     XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
    1554         398 :     char       *slotname = NULL;
    1555             :     WalRcvStreamOptions options;
    1556             : 
    1557         398 :     start_table_sync(&origin_startpos, &slotname);
    1558             : 
    1559         372 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
    1560         372 :                                        MyLogicalRepWorker->relid,
    1561             :                                        originname,
    1562             :                                        sizeof(originname));
    1563             : 
    1564         372 :     set_apply_error_context_origin(originname);
    1565             : 
    1566         372 :     set_stream_options(&options, slotname, &origin_startpos);
    1567             : 
    1568         372 :     walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    1569             : 
    1570             :     /* Apply the changes till we catchup with the apply worker. */
    1571         372 :     start_apply(origin_startpos);
    1572           0 : }
    1573             : 
    1574             : /* Logical Replication Tablesync worker entry point */
    1575             : void
    1576         400 : TableSyncWorkerMain(Datum main_arg)
    1577             : {
    1578         400 :     int         worker_slot = DatumGetInt32(main_arg);
    1579             : 
    1580         400 :     SetupApplyOrSyncWorker(worker_slot);
    1581             : 
    1582         398 :     run_tablesync_worker();
    1583             : 
    1584           0 :     FinishSyncWorker();
    1585             : }
    1586             : 
    1587             : /*
    1588             :  * If the subscription has no tables then return false.
    1589             :  *
    1590             :  * Otherwise, are all tablesyncs READY?
    1591             :  *
    1592             :  * Note: This function is not suitable to be called from outside of apply or
    1593             :  * tablesync workers because MySubscription needs to be already initialized.
    1594             :  */
    1595             : bool
    1596         502 : AllTablesyncsReady(void)
    1597             : {
    1598             :     bool        started_tx;
    1599             :     bool        has_tables;
    1600             : 
    1601             :     /* We need up-to-date sync state info for subscription tables here. */
    1602         502 :     FetchRelationStates(&has_tables, NULL, &started_tx);
    1603             : 
    1604         502 :     if (started_tx)
    1605             :     {
    1606          32 :         CommitTransactionCommand();
    1607          32 :         pgstat_report_stat(true);
    1608             :     }
    1609             : 
    1610             :     /*
    1611             :      * Return false when there are no tables in subscription or not all tables
    1612             :      * are in ready state; true otherwise.
    1613             :      */
    1614         502 :     return has_tables && (table_states_not_ready == NIL);
    1615             : }
    1616             : 
    1617             : /*
    1618             :  * Return whether the subscription currently has any tables.
    1619             :  *
    1620             :  * Note: Unlike HasSubscriptionTables(), this function relies on cached
    1621             :  * information for subscription tables. Additionally, it should not be
    1622             :  * invoked outside of apply or tablesync workers, as MySubscription must be
    1623             :  * initialized first.
    1624             :  */
    1625             : bool
    1626         340 : HasSubscriptionTablesCached(void)
    1627             : {
    1628             :     bool        started_tx;
    1629             :     bool        has_tables;
    1630             : 
    1631             :     /* We need up-to-date subscription tables info here */
    1632         340 :     FetchRelationStates(&has_tables, NULL, &started_tx);
    1633             : 
    1634         340 :     if (started_tx)
    1635             :     {
    1636           0 :         CommitTransactionCommand();
    1637           0 :         pgstat_report_stat(true);
    1638             :     }
    1639             : 
    1640         340 :     return has_tables;
    1641             : }
    1642             : 
    1643             : /*
    1644             :  * Update the two_phase state of the specified subscription in pg_subscription.
    1645             :  */
    1646             : void
    1647          20 : UpdateTwoPhaseState(Oid suboid, char new_state)
    1648             : {
    1649             :     Relation    rel;
    1650             :     HeapTuple   tup;
    1651             :     bool        nulls[Natts_pg_subscription];
    1652             :     bool        replaces[Natts_pg_subscription];
    1653             :     Datum       values[Natts_pg_subscription];
    1654             : 
    1655             :     Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
    1656             :            new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
    1657             :            new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
    1658             : 
    1659          20 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1660          20 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
    1661          20 :     if (!HeapTupleIsValid(tup))
    1662           0 :         elog(ERROR,
    1663             :              "cache lookup failed for subscription oid %u",
    1664             :              suboid);
    1665             : 
    1666             :     /* Form a new tuple. */
    1667          20 :     memset(values, 0, sizeof(values));
    1668          20 :     memset(nulls, false, sizeof(nulls));
    1669          20 :     memset(replaces, false, sizeof(replaces));
    1670             : 
    1671             :     /* And update/set two_phase state */
    1672          20 :     values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
    1673          20 :     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1674             : 
    1675          20 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel),
    1676             :                             values, nulls, replaces);
    1677          20 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1678             : 
    1679          20 :     heap_freetuple(tup);
    1680          20 :     table_close(rel, RowExclusiveLock);
    1681          20 : }

Generated by: LCOV version 1.16