LCOV - code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 344 378 91.0 %
Date: 2021-12-09 04:09:06 Functions: 17 17 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * tablesync.c
       3             :  *    PostgreSQL logical replication: initial table data synchronization
       4             :  *
       5             :  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/tablesync.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code for initial table data synchronization for
      12             :  *    logical replication.
      13             :  *
      14             :  *    The initial data synchronization is done separately for each table,
      15             :  *    in a separate apply worker that only fetches the initial snapshot data
      16             :  *    from the publisher and then synchronizes the position in the stream with
      17             :  *    the main apply worker.
      18             :  *
      19             :  *    There are several reasons for doing the synchronization this way:
      20             :  *     - It allows us to parallelize the initial data synchronization
      21             :  *       which lowers the time needed for it to happen.
      22             :  *     - The initial synchronization does not have to hold the xid and LSN
      23             :  *       for the time it takes to copy data of all tables, causing less
      24             :  *       bloat and lower disk consumption compared to doing the
      25             :  *       synchronization in a single process for the whole database.
      26             :  *     - It allows us to synchronize any tables added after the initial
      27             :  *       synchronization has finished.
      28             :  *
      29             :  *    The stream position synchronization works in multiple steps:
      30             :  *     - Apply worker requests a tablesync worker to start, setting the new
      31             :  *       table state to INIT.
      32             :  *     - Tablesync worker starts; changes table state from INIT to DATASYNC while
      33             :  *       copying.
      34             :  *     - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
      35             :  *       worker specific) state to indicate when the copy phase has completed, so
      36             :  *       if the worker crashes with this (non-memory) state then the copy will not
      37             :  *       be re-attempted.
      38             :  *     - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
      39             :  *     - Apply worker periodically checks for tables in SYNCWAIT state.  When
      40             :  *       any appear, it sets the table state to CATCHUP and starts loop-waiting
      41             :  *       until either the table state is set to SYNCDONE or the sync worker
      42             :  *       exits.
      43             :  *     - After the sync worker has seen the state change to CATCHUP, it will
      44             :  *       read the stream and apply changes (acting like an apply worker) until
      45             :  *       it catches up to the specified stream position.  Then it sets the
      46             :  *       state to SYNCDONE.  There might be zero changes applied between
      47             :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
      48             :  *       apply worker.
      49             :  *     - Once the state is set to SYNCDONE, the apply will continue tracking
      50             :  *       the table until it reaches the SYNCDONE stream position, at which
      51             :  *       point it sets state to READY and stops tracking.  Again, there might
      52             :  *       be zero changes in between.
      53             :  *
      54             :  *    So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
      55             :  *    -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
      56             :  *
      57             :  *    The catalog pg_subscription_rel is used to keep information about
      58             :  *    subscribed tables and their state.  The catalog holds all states
      59             :  *    except SYNCWAIT and CATCHUP which are only in shared memory.
      60             :  *
      61             :  *    Example flows look like this:
      62             :  *     - Apply is in front:
      63             :  *        sync:8
      64             :  *          -> set in catalog FINISHEDCOPY
      65             :  *          -> set in memory SYNCWAIT
      66             :  *        apply:10
      67             :  *          -> set in memory CATCHUP
      68             :  *          -> enter wait-loop
      69             :  *        sync:10
      70             :  *          -> set in catalog SYNCDONE
      71             :  *          -> exit
      72             :  *        apply:10
      73             :  *          -> exit wait-loop
      74             :  *          -> continue rep
      75             :  *        apply:11
      76             :  *          -> set in catalog READY
      77             :  *
      78             :  *     - Sync is in front:
      79             :  *        sync:10
      80             :  *          -> set in catalog FINISHEDCOPY
      81             :  *          -> set in memory SYNCWAIT
      82             :  *        apply:8
      83             :  *          -> set in memory CATCHUP
      84             :  *          -> continue per-table filtering
      85             :  *        sync:10
      86             :  *          -> set in catalog SYNCDONE
      87             :  *          -> exit
      88             :  *        apply:10
      89             :  *          -> set in catalog READY
      90             :  *          -> stop per-table filtering
      91             :  *          -> continue rep
      92             :  *-------------------------------------------------------------------------
      93             :  */
      94             : 
      95             : #include "postgres.h"
      96             : 
      97             : #include "access/table.h"
      98             : #include "access/xact.h"
      99             : #include "catalog/indexing.h"
     100             : #include "catalog/pg_subscription_rel.h"
     101             : #include "catalog/pg_type.h"
     102             : #include "commands/copy.h"
     103             : #include "miscadmin.h"
     104             : #include "parser/parse_relation.h"
     105             : #include "pgstat.h"
     106             : #include "replication/logicallauncher.h"
     107             : #include "replication/logicalrelation.h"
     108             : #include "replication/walreceiver.h"
     109             : #include "replication/worker_internal.h"
     110             : #include "replication/slot.h"
     111             : #include "replication/origin.h"
     112             : #include "storage/ipc.h"
     113             : #include "storage/lmgr.h"
     114             : #include "utils/builtins.h"
     115             : #include "utils/lsyscache.h"
     116             : #include "utils/memutils.h"
     117             : #include "utils/snapmgr.h"
     118             : #include "utils/syscache.h"
     119             : 
     120             : static bool table_states_valid = false;
     121             : static List *table_states_not_ready = NIL;
     122             : static bool FetchTableStates(bool *started_tx);
     123             : 
     124             : StringInfo  copybuf = NULL;
     125             : 
     126             : /*
     127             :  * Exit routine for synchronization worker.
     128             :  */
     129             : static void
     130             : pg_attribute_noreturn()
     131         166 : finish_sync_worker(void)
     132             : {
     133             :     /*
     134             :      * Commit any outstanding transaction. This is the usual case, unless
     135             :      * there was nothing to do for the table.
     136             :      */
     137         166 :     if (IsTransactionState())
     138             :     {
     139         166 :         CommitTransactionCommand();
     140         166 :         pgstat_report_stat(false);
     141             :     }
     142             : 
     143             :     /* And flush all writes. */
     144         166 :     XLogFlush(GetXLogWriteRecPtr());
     145             : 
     146         166 :     StartTransactionCommand();
     147         166 :     ereport(LOG,
     148             :             (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
     149             :                     MySubscription->name,
     150             :                     get_rel_name(MyLogicalRepWorker->relid))));
     151         166 :     CommitTransactionCommand();
     152             : 
     153             :     /* Find the main apply worker and signal it. */
     154         166 :     logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
     155             : 
     156             :     /* Stop gracefully */
     157         166 :     proc_exit(0);
     158             : }
     159             : 
     160             : /*
     161             :  * Wait until the relation sync state is set in the catalog to the expected
     162             :  * one; return true when it happens.
     163             :  *
     164             :  * Returns false if the table sync worker or the table itself have
     165             :  * disappeared, or the table state has been reset.
     166             :  *
     167             :  * Currently, this is used in the apply worker when transitioning from
     168             :  * CATCHUP state to SYNCDONE.
     169             :  */
     170             : static bool
     171         324 : wait_for_relation_state_change(Oid relid, char expected_state)
     172             : {
     173             :     char        state;
     174             : 
     175             :     for (;;)
     176         162 :     {
     177             :         LogicalRepWorker *worker;
     178             :         XLogRecPtr  statelsn;
     179             : 
     180         324 :         CHECK_FOR_INTERRUPTS();
     181             : 
     182         324 :         InvalidateCatalogSnapshot();
     183         324 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     184             :                                         relid, &statelsn);
     185             : 
     186         324 :         if (state == SUBREL_STATE_UNKNOWN)
     187           0 :             break;
     188             : 
     189         324 :         if (state == expected_state)
     190         162 :             return true;
     191             : 
     192             :         /* Check if the sync worker is still running and bail if not. */
     193         162 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     194         162 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
     195             :                                         false);
     196         162 :         LWLockRelease(LogicalRepWorkerLock);
     197         162 :         if (!worker)
     198           0 :             break;
     199             : 
     200         162 :         (void) WaitLatch(MyLatch,
     201             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     202             :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     203             : 
     204         162 :         ResetLatch(MyLatch);
     205             :     }
     206             : 
     207           0 :     return false;
     208             : }
     209             : 
     210             : /*
     211             :  * Wait until the apply worker changes the state of our synchronization
     212             :  * worker to the expected one.
     213             :  *
     214             :  * Used when transitioning from SYNCWAIT state to CATCHUP.
     215             :  *
     216             :  * Returns false if the apply worker has disappeared.
     217             :  */
     218             : static bool
     219         332 : wait_for_worker_state_change(char expected_state)
     220             : {
     221             :     int         rc;
     222             : 
     223             :     for (;;)
     224         166 :     {
     225             :         LogicalRepWorker *worker;
     226             : 
     227         332 :         CHECK_FOR_INTERRUPTS();
     228             : 
     229             :         /*
     230             :          * Done if already in correct state.  (We assume this fetch is atomic
     231             :          * enough to not give a misleading answer if we do it with no lock.)
     232             :          */
     233         332 :         if (MyLogicalRepWorker->relstate == expected_state)
     234         166 :             return true;
     235             : 
     236             :         /*
     237             :          * Bail out if the apply worker has died, else signal it we're
     238             :          * waiting.
     239             :          */
     240         166 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     241         166 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     242             :                                         InvalidOid, false);
     243         166 :         if (worker && worker->proc)
     244         166 :             logicalrep_worker_wakeup_ptr(worker);
     245         166 :         LWLockRelease(LogicalRepWorkerLock);
     246         166 :         if (!worker)
     247           0 :             break;
     248             : 
     249             :         /*
     250             :          * Wait.  We expect to get a latch signal back from the apply worker,
     251             :          * but use a timeout in case it dies without sending one.
     252             :          */
     253         166 :         rc = WaitLatch(MyLatch,
     254             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     255             :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     256             : 
     257         166 :         if (rc & WL_LATCH_SET)
     258         166 :             ResetLatch(MyLatch);
     259             :     }
     260             : 
     261           0 :     return false;
     262             : }
     263             : 
     264             : /*
     265             :  * Callback from syscache invalidation.
     266             :  */
     267             : void
     268        1162 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
     269             : {
     270        1162 :     table_states_valid = false;
     271        1162 : }
     272             : 
     273             : /*
     274             :  * Handle table synchronization cooperation from the synchronization
     275             :  * worker.
     276             :  *
     277             :  * If the sync worker is in CATCHUP state and reached (or passed) the
     278             :  * predetermined synchronization point in the WAL stream, mark the table as
     279             :  * SYNCDONE and finish.
     280             :  */
     281             : static void
     282         170 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
     283             : {
     284         170 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     285             : 
     286         170 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
     287         170 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
     288             :     {
     289             :         TimeLineID  tli;
     290         166 :         char        syncslotname[NAMEDATALEN] = {0};
     291             : 
     292         166 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
     293         166 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
     294             : 
     295         166 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     296             : 
     297             :         /*
     298             :          * UpdateSubscriptionRelState must be called within a transaction.
     299             :          * That transaction will be ended within the finish_sync_worker().
     300             :          */
     301         166 :         if (!IsTransactionState())
     302         166 :             StartTransactionCommand();
     303             : 
     304         166 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     305         166 :                                    MyLogicalRepWorker->relid,
     306         166 :                                    MyLogicalRepWorker->relstate,
     307         166 :                                    MyLogicalRepWorker->relstate_lsn);
     308             : 
     309             :         /*
     310             :          * End streaming so that LogRepWorkerWalRcvConn can be used to drop
     311             :          * the slot.
     312             :          */
     313         166 :         walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
     314             : 
     315             :         /*
     316             :          * Cleanup the tablesync slot.
     317             :          *
     318             :          * This has to be done after updating the state because otherwise if
     319             :          * there is an error while doing the database operations we won't be
     320             :          * able to rollback dropped slot.
     321             :          */
     322         166 :         ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
     323         166 :                                         MyLogicalRepWorker->relid,
     324             :                                         syncslotname,
     325             :                                         sizeof(syncslotname));
     326             : 
     327             :         /*
     328             :          * It is important to give an error if we are unable to drop the slot,
     329             :          * otherwise, it won't be dropped till the corresponding subscription
     330             :          * is dropped. So passing missing_ok = false.
     331             :          */
     332         166 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
     333             : 
     334         166 :         finish_sync_worker();
     335             :     }
     336             :     else
     337           4 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     338           4 : }
     339             : 
     340             : /*
     341             :  * Handle table synchronization cooperation from the apply worker.
     342             :  *
     343             :  * Walk over all subscription tables that are individually tracked by the
     344             :  * apply process (currently, all that have state other than
     345             :  * SUBREL_STATE_READY) and manage synchronization for them.
     346             :  *
     347             :  * If there are tables that need synchronizing and are not being synchronized
     348             :  * yet, start sync workers for them (if there are free slots for sync
     349             :  * workers).  To prevent starting the sync worker for the same relation at a
     350             :  * high frequency after a failure, we store its last start time with each sync
     351             :  * state info.  We start the sync worker for the same relation after waiting
     352             :  * at least wal_retrieve_retry_interval.
     353             :  *
     354             :  * For tables that are being synchronized already, check if sync workers
     355             :  * either need action from the apply worker or have finished.  This is the
     356             :  * SYNCWAIT to CATCHUP transition.
     357             :  *
     358             :  * If the synchronization position is reached (SYNCDONE), then the table can
     359             :  * be marked as READY and is no longer tracked.
     360             :  */
     361             : static void
     362       11160 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
     363             : {
     364             :     struct tablesync_start_time_mapping
     365             :     {
     366             :         Oid         relid;
     367             :         TimestampTz last_start_time;
     368             :     };
     369             :     static HTAB *last_start_times = NULL;
     370             :     ListCell   *lc;
     371       11160 :     bool        started_tx = false;
     372             : 
     373             :     Assert(!IsTransactionState());
     374             : 
     375             :     /* We need up-to-date sync state info for subscription tables here. */
     376       11160 :     FetchTableStates(&started_tx);
     377             : 
     378             :     /*
     379             :      * Prepare a hash table for tracking last start times of workers, to avoid
     380             :      * immediate restarts.  We don't need it if there are no tables that need
     381             :      * syncing.
     382             :      */
     383       11160 :     if (table_states_not_ready && !last_start_times)
     384          90 :     {
     385             :         HASHCTL     ctl;
     386             : 
     387          90 :         ctl.keysize = sizeof(Oid);
     388          90 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
     389          90 :         last_start_times = hash_create("Logical replication table sync worker start times",
     390             :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
     391             :     }
     392             : 
     393             :     /*
     394             :      * Clean up the hash table when we're done with all tables (just to
     395             :      * release the bit of memory).
     396             :      */
     397       11070 :     else if (!table_states_not_ready && last_start_times)
     398             :     {
     399          72 :         hash_destroy(last_start_times);
     400          72 :         last_start_times = NULL;
     401             :     }
     402             : 
     403             :     /*
     404             :      * Even when the two_phase mode is requested by the user, it remains as
     405             :      * 'pending' until all tablesyncs have reached READY state.
     406             :      *
     407             :      * When this happens, we restart the apply worker and (if the conditions
     408             :      * are still ok) then the two_phase tri-state will become 'enabled' at
     409             :      * that time.
     410             :      *
     411             :      * Note: If the subscription has no tables then leave the state as
     412             :      * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
     413             :      * work.
     414             :      */
     415       11216 :     if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
     416          56 :         AllTablesyncsReady())
     417             :     {
     418           8 :         ereport(LOG,
     419             :                 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
     420             :                         MySubscription->name)));
     421             : 
     422           8 :         proc_exit(0);
     423             :     }
     424             : 
     425             :     /*
     426             :      * Process all tables that are being synchronized.
     427             :      */
     428       12402 :     foreach(lc, table_states_not_ready)
     429             :     {
     430        1250 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
     431             : 
     432        1250 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
     433             :         {
     434             :             /*
     435             :              * Apply has caught up to the position where the table sync has
     436             :              * finished.  Mark the table as ready so that the apply will just
     437             :              * continue to replicate it normally.
     438             :              */
     439         164 :             if (current_lsn >= rstate->lsn)
     440             :             {
     441             :                 char        originname[NAMEDATALEN];
     442             : 
     443         162 :                 rstate->state = SUBREL_STATE_READY;
     444         162 :                 rstate->lsn = current_lsn;
     445         162 :                 if (!started_tx)
     446             :                 {
     447           2 :                     StartTransactionCommand();
     448           2 :                     started_tx = true;
     449             :                 }
     450             : 
     451             :                 /*
     452             :                  * Remove the tablesync origin tracking if exists.
     453             :                  *
     454             :                  * The normal case origin drop is done here instead of in the
     455             :                  * process_syncing_tables_for_sync function because we don't
     456             :                  * allow to drop the origin till the process owning the origin
     457             :                  * is alive.
     458             :                  *
     459             :                  * There is a chance that the user is concurrently performing
     460             :                  * refresh for the subscription where we remove the table
     461             :                  * state and its origin and by this time the origin might be
     462             :                  * already removed. So passing missing_ok = true.
     463             :                  */
     464         162 :                 ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
     465             :                                                   rstate->relid,
     466             :                                                   originname,
     467             :                                                   sizeof(originname));
     468         162 :                 replorigin_drop_by_name(originname, true, false);
     469             : 
     470             :                 /*
     471             :                  * Update the state to READY only after the origin cleanup.
     472             :                  */
     473         162 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     474         162 :                                            rstate->relid, rstate->state,
     475             :                                            rstate->lsn);
     476             :             }
     477             :         }
     478             :         else
     479             :         {
     480             :             LogicalRepWorker *syncworker;
     481             : 
     482             :             /*
     483             :              * Look for a sync worker for this relation.
     484             :              */
     485        1086 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     486             : 
     487        1086 :             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     488             :                                                 rstate->relid, false);
     489             : 
     490        1086 :             if (syncworker)
     491             :             {
     492             :                 /* Found one, update our copy of its state */
     493         448 :                 SpinLockAcquire(&syncworker->relmutex);
     494         448 :                 rstate->state = syncworker->relstate;
     495         448 :                 rstate->lsn = syncworker->relstate_lsn;
     496         448 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     497             :                 {
     498             :                     /*
     499             :                      * Sync worker is waiting for apply.  Tell sync worker it
     500             :                      * can catchup now.
     501             :                      */
     502         162 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
     503         162 :                     syncworker->relstate_lsn =
     504         162 :                         Max(syncworker->relstate_lsn, current_lsn);
     505             :                 }
     506         448 :                 SpinLockRelease(&syncworker->relmutex);
     507             : 
     508             :                 /* If we told worker to catch up, wait for it. */
     509         448 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     510             :                 {
     511             :                     /* Signal the sync worker, as it may be waiting for us. */
     512         162 :                     if (syncworker->proc)
     513         162 :                         logicalrep_worker_wakeup_ptr(syncworker);
     514             : 
     515             :                     /* Now safe to release the LWLock */
     516         162 :                     LWLockRelease(LogicalRepWorkerLock);
     517             : 
     518             :                     /*
     519             :                      * Enter busy loop and wait for synchronization worker to
     520             :                      * reach expected state (or die trying).
     521             :                      */
     522         162 :                     if (!started_tx)
     523             :                     {
     524           0 :                         StartTransactionCommand();
     525           0 :                         started_tx = true;
     526             :                     }
     527             : 
     528         162 :                     wait_for_relation_state_change(rstate->relid,
     529             :                                                    SUBREL_STATE_SYNCDONE);
     530             :                 }
     531             :                 else
     532         286 :                     LWLockRelease(LogicalRepWorkerLock);
     533             :             }
     534             :             else
     535             :             {
     536             :                 /*
     537             :                  * If there is no sync worker for this table yet, count
     538             :                  * running sync workers for this subscription, while we have
     539             :                  * the lock.
     540             :                  */
     541             :                 int         nsyncworkers =
     542         638 :                 logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     543             : 
     544             :                 /* Now safe to release the LWLock */
     545         638 :                 LWLockRelease(LogicalRepWorkerLock);
     546             : 
     547             :                 /*
     548             :                  * If there are free sync worker slot(s), start a new sync
     549             :                  * worker for the table.
     550             :                  */
     551         638 :                 if (nsyncworkers < max_sync_workers_per_subscription)
     552             :                 {
     553         184 :                     TimestampTz now = GetCurrentTimestamp();
     554             :                     struct tablesync_start_time_mapping *hentry;
     555             :                     bool        found;
     556             : 
     557         184 :                     hentry = hash_search(last_start_times, &rstate->relid,
     558             :                                          HASH_ENTER, &found);
     559             : 
     560         202 :                     if (!found ||
     561          18 :                         TimestampDifferenceExceeds(hentry->last_start_time, now,
     562             :                                                    wal_retrieve_retry_interval))
     563             :                     {
     564         174 :                         logicalrep_worker_launch(MyLogicalRepWorker->dbid,
     565         174 :                                                  MySubscription->oid,
     566         174 :                                                  MySubscription->name,
     567         174 :                                                  MyLogicalRepWorker->userid,
     568             :                                                  rstate->relid);
     569         174 :                         hentry->last_start_time = now;
     570             :                     }
     571             :                 }
     572             :             }
     573             :         }
     574             :     }
     575             : 
     576       11152 :     if (started_tx)
     577             :     {
     578         650 :         CommitTransactionCommand();
     579         650 :         pgstat_report_stat(false);
     580             :     }
     581       11152 : }
     582             : 
     583             : /*
     584             :  * Process possible state change(s) of tables that are being synchronized.
     585             :  */
     586             : void
     587       11330 : process_syncing_tables(XLogRecPtr current_lsn)
     588             : {
     589       11330 :     if (am_tablesync_worker())
     590         170 :         process_syncing_tables_for_sync(current_lsn);
     591             :     else
     592       11160 :         process_syncing_tables_for_apply(current_lsn);
     593       11156 : }
     594             : 
     595             : /*
     596             :  * Create list of columns for COPY based on logical relation mapping.
     597             :  */
     598             : static List *
     599         176 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
     600             : {
     601         176 :     List       *attnamelist = NIL;
     602             :     int         i;
     603             : 
     604         464 :     for (i = 0; i < rel->remoterel.natts; i++)
     605             :     {
     606         288 :         attnamelist = lappend(attnamelist,
     607         288 :                               makeString(rel->remoterel.attnames[i]));
     608             :     }
     609             : 
     610             : 
     611         176 :     return attnamelist;
     612             : }
     613             : 
     614             : /*
     615             :  * Data source callback for the COPY FROM, which reads from the remote
     616             :  * connection and passes the data back to our local COPY.
     617             :  */
     618             : static int
     619       26748 : copy_read_data(void *outbuf, int minread, int maxread)
     620             : {
     621       26748 :     int         bytesread = 0;
     622             :     int         avail;
     623             : 
     624             :     /* If there are some leftover data from previous read, use it. */
     625       26748 :     avail = copybuf->len - copybuf->cursor;
     626       26748 :     if (avail)
     627             :     {
     628           0 :         if (avail > maxread)
     629           0 :             avail = maxread;
     630           0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     631           0 :         copybuf->cursor += avail;
     632           0 :         maxread -= avail;
     633           0 :         bytesread += avail;
     634             :     }
     635             : 
     636       26750 :     while (maxread > 0 && bytesread < minread)
     637             :     {
     638       26750 :         pgsocket    fd = PGINVALID_SOCKET;
     639             :         int         len;
     640       26750 :         char       *buf = NULL;
     641             : 
     642             :         for (;;)
     643             :         {
     644             :             /* Try read the data. */
     645       26750 :             len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
     646             : 
     647       26750 :             CHECK_FOR_INTERRUPTS();
     648             : 
     649       26750 :             if (len == 0)
     650           2 :                 break;
     651       26748 :             else if (len < 0)
     652       26748 :                 return bytesread;
     653             :             else
     654             :             {
     655             :                 /* Process the data */
     656       26572 :                 copybuf->data = buf;
     657       26572 :                 copybuf->len = len;
     658       26572 :                 copybuf->cursor = 0;
     659             : 
     660       26572 :                 avail = copybuf->len - copybuf->cursor;
     661       26572 :                 if (avail > maxread)
     662           0 :                     avail = maxread;
     663       26572 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     664       26572 :                 outbuf = (void *) ((char *) outbuf + avail);
     665       26572 :                 copybuf->cursor += avail;
     666       26572 :                 maxread -= avail;
     667       26572 :                 bytesread += avail;
     668             :             }
     669             : 
     670       26572 :             if (maxread <= 0 || bytesread >= minread)
     671       26572 :                 return bytesread;
     672             :         }
     673             : 
     674             :         /*
     675             :          * Wait for more data or latch.
     676             :          */
     677           2 :         (void) WaitLatchOrSocket(MyLatch,
     678             :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
     679             :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     680             :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
     681             : 
     682           2 :         ResetLatch(MyLatch);
     683             :     }
     684             : 
     685           0 :     return bytesread;
     686             : }
     687             : 
     688             : 
     689             : /*
     690             :  * Get information about remote relation in similar fashion the RELATION
     691             :  * message provides during replication.
     692             :  */
     693             : static void
     694         176 : fetch_remote_table_info(char *nspname, char *relname,
     695             :                         LogicalRepRelation *lrel)
     696             : {
     697             :     WalRcvExecResult *res;
     698             :     StringInfoData cmd;
     699             :     TupleTableSlot *slot;
     700         176 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
     701         176 :     Oid         attrRow[] = {TEXTOID, OIDOID, BOOLOID};
     702             :     bool        isnull;
     703             :     int         natt;
     704             : 
     705         176 :     lrel->nspname = nspname;
     706         176 :     lrel->relname = relname;
     707             : 
     708             :     /* First fetch Oid and replica identity. */
     709         176 :     initStringInfo(&cmd);
     710         176 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
     711             :                      "  FROM pg_catalog.pg_class c"
     712             :                      "  INNER JOIN pg_catalog.pg_namespace n"
     713             :                      "        ON (c.relnamespace = n.oid)"
     714             :                      " WHERE n.nspname = %s"
     715             :                      "   AND c.relname = %s",
     716             :                      quote_literal_cstr(nspname),
     717             :                      quote_literal_cstr(relname));
     718         176 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     719             :                       lengthof(tableRow), tableRow);
     720             : 
     721         176 :     if (res->status != WALRCV_OK_TUPLES)
     722           0 :         ereport(ERROR,
     723             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     724             :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     725             :                         nspname, relname, res->err)));
     726             : 
     727         176 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     728         176 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     729           0 :         ereport(ERROR,
     730             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     731             :                  errmsg("table \"%s.%s\" not found on publisher",
     732             :                         nspname, relname)));
     733             : 
     734         176 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
     735             :     Assert(!isnull);
     736         176 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
     737             :     Assert(!isnull);
     738         176 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
     739             :     Assert(!isnull);
     740             : 
     741         176 :     ExecDropSingleTupleTableSlot(slot);
     742         176 :     walrcv_clear_result(res);
     743             : 
     744             :     /* Now fetch columns. */
     745         176 :     resetStringInfo(&cmd);
     746         176 :     appendStringInfo(&cmd,
     747             :                      "SELECT a.attname,"
     748             :                      "       a.atttypid,"
     749             :                      "       a.attnum = ANY(i.indkey)"
     750             :                      "  FROM pg_catalog.pg_attribute a"
     751             :                      "  LEFT JOIN pg_catalog.pg_index i"
     752             :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
     753             :                      " WHERE a.attnum > 0::pg_catalog.int2"
     754             :                      "   AND NOT a.attisdropped %s"
     755             :                      "   AND a.attrelid = %u"
     756             :                      " ORDER BY a.attnum",
     757             :                      lrel->remoteid,
     758         176 :                      (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
     759             :                       "AND a.attgenerated = ''" : ""),
     760             :                      lrel->remoteid);
     761         176 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     762             :                       lengthof(attrRow), attrRow);
     763             : 
     764         176 :     if (res->status != WALRCV_OK_TUPLES)
     765           0 :         ereport(ERROR,
     766             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     767             :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     768             :                         nspname, relname, res->err)));
     769             : 
     770             :     /* We don't know the number of rows coming, so allocate enough space. */
     771         176 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
     772         176 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
     773         176 :     lrel->attkeys = NULL;
     774             : 
     775         176 :     natt = 0;
     776         176 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     777         464 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     778             :     {
     779         576 :         lrel->attnames[natt] =
     780         288 :             TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     781             :         Assert(!isnull);
     782         288 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
     783             :         Assert(!isnull);
     784         288 :         if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
     785         120 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
     786             : 
     787             :         /* Should never happen. */
     788         288 :         if (++natt >= MaxTupleAttributeNumber)
     789           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
     790             :                  nspname, relname);
     791             : 
     792         288 :         ExecClearTuple(slot);
     793             :     }
     794         176 :     ExecDropSingleTupleTableSlot(slot);
     795             : 
     796         176 :     lrel->natts = natt;
     797             : 
     798         176 :     walrcv_clear_result(res);
     799         176 :     pfree(cmd.data);
     800         176 : }
     801             : 
     802             : /*
     803             :  * Copy existing data of a table from publisher.
     804             :  *
     805             :  * Caller is responsible for locking the local relation.
     806             :  */
     807             : static void
     808         176 : copy_table(Relation rel)
     809             : {
     810             :     LogicalRepRelMapEntry *relmapentry;
     811             :     LogicalRepRelation lrel;
     812             :     WalRcvExecResult *res;
     813             :     StringInfoData cmd;
     814             :     CopyFromState cstate;
     815             :     List       *attnamelist;
     816             :     ParseState *pstate;
     817             : 
     818             :     /* Get the publisher relation info. */
     819         176 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
     820         176 :                             RelationGetRelationName(rel), &lrel);
     821             : 
     822             :     /* Put the relation into relmap. */
     823         176 :     logicalrep_relmap_update(&lrel);
     824             : 
     825             :     /* Map the publisher relation to local one. */
     826         176 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
     827             :     Assert(rel == relmapentry->localrel);
     828             : 
     829             :     /* Start copy on the publisher. */
     830         176 :     initStringInfo(&cmd);
     831         176 :     if (lrel.relkind == RELKIND_RELATION)
     832         168 :         appendStringInfo(&cmd, "COPY %s TO STDOUT",
     833         168 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
     834             :     else
     835             :     {
     836             :         /*
     837             :          * For non-tables, we need to do COPY (SELECT ...), but we can't just
     838             :          * do SELECT * because we need to not copy generated columns.
     839             :          */
     840           8 :         appendStringInfoString(&cmd, "COPY (SELECT ");
     841          24 :         for (int i = 0; i < lrel.natts; i++)
     842             :         {
     843          16 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
     844          16 :             if (i < lrel.natts - 1)
     845           8 :                 appendStringInfoString(&cmd, ", ");
     846             :         }
     847           8 :         appendStringInfo(&cmd, " FROM %s) TO STDOUT",
     848           8 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
     849             :     }
     850         176 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
     851         176 :     pfree(cmd.data);
     852         176 :     if (res->status != WALRCV_OK_COPY_OUT)
     853           0 :         ereport(ERROR,
     854             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     855             :                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
     856             :                         lrel.nspname, lrel.relname, res->err)));
     857         176 :     walrcv_clear_result(res);
     858             : 
     859         176 :     copybuf = makeStringInfo();
     860             : 
     861         176 :     pstate = make_parsestate(NULL);
     862         176 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
     863             :                                          NULL, false, false);
     864             : 
     865         176 :     attnamelist = make_copy_attnamelist(relmapentry);
     866         176 :     cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
     867             : 
     868             :     /* Do the copy */
     869         176 :     (void) CopyFrom(cstate);
     870             : 
     871         166 :     logicalrep_rel_close(relmapentry, NoLock);
     872         166 : }
     873             : 
     874             : /*
     875             :  * Determine the tablesync slot name.
     876             :  *
     877             :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
     878             :  * on slot name length. We append system_identifier to avoid slot_name
     879             :  * collision with subscriptions in other clusters. With the current scheme
     880             :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
     881             :  * length of slot_name will be 50.
     882             :  *
     883             :  * The returned slot name is stored in the supplied buffer (syncslotname) with
     884             :  * the given size.
     885             :  *
     886             :  * Note: We don't use the subscription slot name as part of tablesync slot name
     887             :  * because we are responsible for cleaning up these slots and it could become
     888             :  * impossible to recalculate what name to cleanup if the subscription slot name
     889             :  * had changed.
     890             :  */
     891             : void
     892         346 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
     893             :                                 char *syncslotname, int szslot)
     894             : {
     895         346 :     snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
     896             :              relid, GetSystemIdentifier());
     897         346 : }
     898             : 
     899             : /*
     900             :  * Form the origin name for tablesync.
     901             :  *
     902             :  * Return the name in the supplied buffer.
     903             :  */
     904             : void
     905         342 : ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
     906             :                                   char *originname, int szorgname)
     907             : {
     908         342 :     snprintf(originname, szorgname, "pg_%u_%u", suboid, relid);
     909         342 : }
     910             : 
     911             : /*
     912             :  * Start syncing the table in the sync worker.
     913             :  *
     914             :  * If nothing needs to be done to sync the table, we exit the worker without
     915             :  * any further action.
     916             :  *
     917             :  * The returned slot name is palloc'ed in current memory context.
     918             :  */
     919             : char *
     920         178 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
     921             : {
     922             :     char       *slotname;
     923             :     char       *err;
     924             :     char        relstate;
     925             :     XLogRecPtr  relstate_lsn;
     926             :     Relation    rel;
     927             :     WalRcvExecResult *res;
     928             :     char        originname[NAMEDATALEN];
     929             :     RepOriginId originid;
     930             : 
     931             :     /* Check the state of the table synchronization. */
     932         178 :     StartTransactionCommand();
     933         178 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     934         178 :                                        MyLogicalRepWorker->relid,
     935             :                                        &relstate_lsn);
     936         178 :     CommitTransactionCommand();
     937             : 
     938         178 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     939         178 :     MyLogicalRepWorker->relstate = relstate;
     940         178 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
     941         178 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
     942             : 
     943             :     /*
     944             :      * If synchronization is already done or no longer necessary, exit now
     945             :      * that we've updated shared memory state.
     946             :      */
     947         178 :     switch (relstate)
     948             :     {
     949           0 :         case SUBREL_STATE_SYNCDONE:
     950             :         case SUBREL_STATE_READY:
     951             :         case SUBREL_STATE_UNKNOWN:
     952           0 :             finish_sync_worker();   /* doesn't return */
     953             :     }
     954             : 
     955             :     /* Calculate the name of the tablesync slot. */
     956         178 :     slotname = (char *) palloc(NAMEDATALEN);
     957         178 :     ReplicationSlotNameForTablesync(MySubscription->oid,
     958         178 :                                     MyLogicalRepWorker->relid,
     959             :                                     slotname,
     960             :                                     NAMEDATALEN);
     961             : 
     962             :     /*
     963             :      * Here we use the slot name instead of the subscription name as the
     964             :      * application_name, so that it is different from the main apply worker,
     965             :      * so that synchronous replication can distinguish them.
     966             :      */
     967         178 :     LogRepWorkerWalRcvConn =
     968         178 :         walrcv_connect(MySubscription->conninfo, true, slotname, &err);
     969         178 :     if (LogRepWorkerWalRcvConn == NULL)
     970           0 :         ereport(ERROR,
     971             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     972             :                  errmsg("could not connect to the publisher: %s", err)));
     973             : 
     974             :     Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
     975             :            MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
     976             :            MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
     977             : 
     978             :     /* Assign the origin tracking record name. */
     979         178 :     ReplicationOriginNameForTablesync(MySubscription->oid,
     980         178 :                                       MyLogicalRepWorker->relid,
     981             :                                       originname,
     982             :                                       sizeof(originname));
     983             : 
     984         178 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
     985             :     {
     986             :         /*
     987             :          * We have previously errored out before finishing the copy so the
     988             :          * replication slot might exist. We want to remove the slot if it
     989             :          * already exists and proceed.
     990             :          *
     991             :          * XXX We could also instead try to drop the slot, last time we failed
     992             :          * but for that, we might need to clean up the copy state as it might
     993             :          * be in the middle of fetching the rows. Also, if there is a network
     994             :          * breakdown then it wouldn't have succeeded so trying it next time
     995             :          * seems like a better bet.
     996             :          */
     997          10 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
     998             :     }
     999         168 :     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
    1000             :     {
    1001             :         /*
    1002             :          * The COPY phase was previously done, but tablesync then crashed
    1003             :          * before it was able to finish normally.
    1004             :          */
    1005           0 :         StartTransactionCommand();
    1006             : 
    1007             :         /*
    1008             :          * The origin tracking name must already exist. It was created first
    1009             :          * time this tablesync was launched.
    1010             :          */
    1011           0 :         originid = replorigin_by_name(originname, false);
    1012           0 :         replorigin_session_setup(originid);
    1013           0 :         replorigin_session_origin = originid;
    1014           0 :         *origin_startpos = replorigin_session_get_progress(false);
    1015             : 
    1016           0 :         CommitTransactionCommand();
    1017             : 
    1018           0 :         goto copy_table_done;
    1019             :     }
    1020             : 
    1021         178 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1022         178 :     MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
    1023         178 :     MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
    1024         178 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1025             : 
    1026             :     /* Update the state and make it visible to others. */
    1027         178 :     StartTransactionCommand();
    1028         178 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1029         178 :                                MyLogicalRepWorker->relid,
    1030         178 :                                MyLogicalRepWorker->relstate,
    1031         178 :                                MyLogicalRepWorker->relstate_lsn);
    1032         176 :     CommitTransactionCommand();
    1033         176 :     pgstat_report_stat(false);
    1034             : 
    1035         176 :     StartTransactionCommand();
    1036             : 
    1037             :     /*
    1038             :      * Use a standard write lock here. It might be better to disallow access
    1039             :      * to the table while it's being synchronized. But we don't want to block
    1040             :      * the main apply process from working and it has to open the relation in
    1041             :      * RowExclusiveLock when remapping remote relation id to local one.
    1042             :      */
    1043         176 :     rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
    1044             : 
    1045             :     /*
    1046             :      * Start a transaction in the remote node in REPEATABLE READ mode.  This
    1047             :      * ensures that both the replication slot we create (see below) and the
    1048             :      * COPY are consistent with each other.
    1049             :      */
    1050         176 :     res = walrcv_exec(LogRepWorkerWalRcvConn,
    1051             :                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
    1052             :                       0, NULL);
    1053         176 :     if (res->status != WALRCV_OK_COMMAND)
    1054           0 :         ereport(ERROR,
    1055             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1056             :                  errmsg("table copy could not start transaction on publisher: %s",
    1057             :                         res->err)));
    1058         176 :     walrcv_clear_result(res);
    1059             : 
    1060             :     /*
    1061             :      * Create a new permanent logical decoding slot. This slot will be used
    1062             :      * for the catchup phase after COPY is done, so tell it to use the
    1063             :      * snapshot to make the final data consistent.
    1064             :      *
    1065             :      * Prevent cancel/die interrupts while creating slot here because it is
    1066             :      * possible that before the server finishes this command, a concurrent
    1067             :      * drop subscription happens which would complete without removing this
    1068             :      * slot leading to a dangling slot on the server.
    1069             :      */
    1070         176 :     HOLD_INTERRUPTS();
    1071         176 :     walrcv_create_slot(LogRepWorkerWalRcvConn,
    1072             :                        slotname, false /* permanent */ , false /* two_phase */ ,
    1073             :                        CRS_USE_SNAPSHOT, origin_startpos);
    1074         176 :     RESUME_INTERRUPTS();
    1075             : 
    1076             :     /*
    1077             :      * Setup replication origin tracking. The purpose of doing this before the
    1078             :      * copy is to avoid doing the copy again due to any error in setting up
    1079             :      * origin tracking.
    1080             :      */
    1081         176 :     originid = replorigin_by_name(originname, true);
    1082         176 :     if (!OidIsValid(originid))
    1083             :     {
    1084             :         /*
    1085             :          * Origin tracking does not exist, so create it now.
    1086             :          *
    1087             :          * Then advance to the LSN got from walrcv_create_slot. This is WAL
    1088             :          * logged for the purpose of recovery. Locks are to prevent the
    1089             :          * replication origin from vanishing while advancing.
    1090             :          */
    1091         176 :         originid = replorigin_create(originname);
    1092             : 
    1093         176 :         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1094         176 :         replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
    1095             :                            true /* go backward */ , true /* WAL log */ );
    1096         176 :         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1097             : 
    1098         176 :         replorigin_session_setup(originid);
    1099         176 :         replorigin_session_origin = originid;
    1100             :     }
    1101             :     else
    1102             :     {
    1103           0 :         ereport(ERROR,
    1104             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1105             :                  errmsg("replication origin \"%s\" already exists",
    1106             :                         originname)));
    1107             :     }
    1108             : 
    1109             :     /* Now do the initial data copy */
    1110         176 :     PushActiveSnapshot(GetTransactionSnapshot());
    1111         176 :     copy_table(rel);
    1112         166 :     PopActiveSnapshot();
    1113             : 
    1114         166 :     res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
    1115         166 :     if (res->status != WALRCV_OK_COMMAND)
    1116           0 :         ereport(ERROR,
    1117             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1118             :                  errmsg("table copy could not finish transaction on publisher: %s",
    1119             :                         res->err)));
    1120         166 :     walrcv_clear_result(res);
    1121             : 
    1122         166 :     table_close(rel, NoLock);
    1123             : 
    1124             :     /* Make the copy visible. */
    1125         166 :     CommandCounterIncrement();
    1126             : 
    1127             :     /*
    1128             :      * Update the persisted state to indicate the COPY phase is done; make it
    1129             :      * visible to others.
    1130             :      */
    1131         166 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1132         166 :                                MyLogicalRepWorker->relid,
    1133             :                                SUBREL_STATE_FINISHEDCOPY,
    1134         166 :                                MyLogicalRepWorker->relstate_lsn);
    1135             : 
    1136         166 :     CommitTransactionCommand();
    1137             : 
    1138         166 : copy_table_done:
    1139             : 
    1140         166 :     elog(DEBUG1,
    1141             :          "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
    1142             :          originname, LSN_FORMAT_ARGS(*origin_startpos));
    1143             : 
    1144             :     /*
    1145             :      * We are done with the initial data synchronization, update the state.
    1146             :      */
    1147         166 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1148         166 :     MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
    1149         166 :     MyLogicalRepWorker->relstate_lsn = *origin_startpos;
    1150         166 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1151             : 
    1152             :     /*
    1153             :      * Finally, wait until the main apply worker tells us to catch up and then
    1154             :      * return to let LogicalRepApplyLoop do it.
    1155             :      */
    1156         166 :     wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
    1157         166 :     return slotname;
    1158             : }
    1159             : 
    1160             : /*
    1161             :  * Common code to fetch the up-to-date sync state info into the static lists.
    1162             :  *
    1163             :  * Returns true if subscription has 1 or more tables, else false.
    1164             :  *
    1165             :  * Note: If this function started the transaction (indicated by the parameter)
    1166             :  * then it is the caller's responsibility to commit it.
    1167             :  */
    1168             : static bool
    1169       11228 : FetchTableStates(bool *started_tx)
    1170             : {
    1171             :     static bool has_subrels = false;
    1172             : 
    1173       11228 :     *started_tx = false;
    1174             : 
    1175       11228 :     if (!table_states_valid)
    1176             :     {
    1177             :         MemoryContext oldctx;
    1178             :         List       *rstates;
    1179             :         ListCell   *lc;
    1180             :         SubscriptionRelState *rstate;
    1181             : 
    1182             :         /* Clean the old lists. */
    1183         668 :         list_free_deep(table_states_not_ready);
    1184         668 :         table_states_not_ready = NIL;
    1185             : 
    1186         668 :         if (!IsTransactionState())
    1187             :         {
    1188         668 :             StartTransactionCommand();
    1189         668 :             *started_tx = true;
    1190             :         }
    1191             : 
    1192             :         /* Fetch all non-ready tables. */
    1193         668 :         rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
    1194             : 
    1195             :         /* Allocate the tracking info in a permanent memory context. */
    1196         668 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    1197        1822 :         foreach(lc, rstates)
    1198             :         {
    1199        1154 :             rstate = palloc(sizeof(SubscriptionRelState));
    1200        1154 :             memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
    1201        1154 :             table_states_not_ready = lappend(table_states_not_ready, rstate);
    1202             :         }
    1203         668 :         MemoryContextSwitchTo(oldctx);
    1204             : 
    1205             :         /*
    1206             :          * Does the subscription have tables?
    1207             :          *
    1208             :          * If there were not-READY relations found then we know it does. But
    1209             :          * if table_state_not_ready was empty we still need to check again to
    1210             :          * see if there are 0 tables.
    1211             :          */
    1212         840 :         has_subrels = (list_length(table_states_not_ready) > 0) ||
    1213         172 :             HasSubscriptionRelations(MySubscription->oid);
    1214             : 
    1215         668 :         table_states_valid = true;
    1216             :     }
    1217             : 
    1218       11228 :     return has_subrels;
    1219             : }
    1220             : 
    1221             : /*
    1222             :  * If the subscription has no tables then return false.
    1223             :  *
    1224             :  * Otherwise, are all tablesyncs READY?
    1225             :  *
    1226             :  * Note: This function is not suitable to be called from outside of apply or
    1227             :  * tablesync workers because MySubscription needs to be already initialized.
    1228             :  */
    1229             : bool
    1230          68 : AllTablesyncsReady(void)
    1231             : {
    1232          68 :     bool        started_tx = false;
    1233          68 :     bool        has_subrels = false;
    1234             : 
    1235             :     /* We need up-to-date sync state info for subscription tables here. */
    1236          68 :     has_subrels = FetchTableStates(&started_tx);
    1237             : 
    1238          68 :     if (started_tx)
    1239             :     {
    1240          12 :         CommitTransactionCommand();
    1241          12 :         pgstat_report_stat(false);
    1242             :     }
    1243             : 
    1244             :     /*
    1245             :      * Return false when there are no tables in subscription or not all tables
    1246             :      * are in ready state; true otherwise.
    1247             :      */
    1248          68 :     return has_subrels && list_length(table_states_not_ready) == 0;
    1249             : }
    1250             : 
    1251             : /*
    1252             :  * Update the two_phase state of the specified subscription in pg_subscription.
    1253             :  */
    1254             : void
    1255           6 : UpdateTwoPhaseState(Oid suboid, char new_state)
    1256             : {
    1257             :     Relation    rel;
    1258             :     HeapTuple   tup;
    1259             :     bool        nulls[Natts_pg_subscription];
    1260             :     bool        replaces[Natts_pg_subscription];
    1261             :     Datum       values[Natts_pg_subscription];
    1262             : 
    1263             :     Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
    1264             :            new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
    1265             :            new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
    1266             : 
    1267           6 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1268           6 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
    1269           6 :     if (!HeapTupleIsValid(tup))
    1270           0 :         elog(ERROR,
    1271             :              "cache lookup failed for subscription oid %u",
    1272             :              suboid);
    1273             : 
    1274             :     /* Form a new tuple. */
    1275           6 :     memset(values, 0, sizeof(values));
    1276           6 :     memset(nulls, false, sizeof(nulls));
    1277           6 :     memset(replaces, false, sizeof(replaces));
    1278             : 
    1279             :     /* And update/set two_phase state */
    1280           6 :     values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
    1281           6 :     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1282             : 
    1283           6 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel),
    1284             :                             values, nulls, replaces);
    1285           6 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1286             : 
    1287           6 :     heap_freetuple(tup);
    1288           6 :     table_close(rel, RowExclusiveLock);
    1289           6 : }

Generated by: LCOV version 1.14