LCOV - code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 484 527 91.8 %
Date: 2024-04-27 01:11:30 Functions: 19 19 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * tablesync.c
       3             :  *    PostgreSQL logical replication: initial table data synchronization
       4             :  *
       5             :  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/tablesync.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code for initial table data synchronization for
      12             :  *    logical replication.
      13             :  *
      14             :  *    The initial data synchronization is done separately for each table,
      15             :  *    in a separate apply worker that only fetches the initial snapshot data
      16             :  *    from the publisher and then synchronizes the position in the stream with
      17             :  *    the leader apply worker.
      18             :  *
      19             :  *    There are several reasons for doing the synchronization this way:
      20             :  *     - It allows us to parallelize the initial data synchronization
      21             :  *       which lowers the time needed for it to happen.
      22             :  *     - The initial synchronization does not have to hold the xid and LSN
      23             :  *       for the time it takes to copy data of all tables, causing less
      24             :  *       bloat and lower disk consumption compared to doing the
      25             :  *       synchronization in a single process for the whole database.
      26             :  *     - It allows us to synchronize any tables added after the initial
      27             :  *       synchronization has finished.
      28             :  *
      29             :  *    The stream position synchronization works in multiple steps:
      30             :  *     - Apply worker requests a tablesync worker to start, setting the new
      31             :  *       table state to INIT.
      32             :  *     - Tablesync worker starts; changes table state from INIT to DATASYNC while
      33             :  *       copying.
      34             :  *     - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
      35             :  *       worker specific) state to indicate when the copy phase has completed, so
      36             :  *       if the worker crashes with this (non-memory) state then the copy will not
      37             :  *       be re-attempted.
      38             :  *     - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
      39             :  *     - Apply worker periodically checks for tables in SYNCWAIT state.  When
      40             :  *       any appear, it sets the table state to CATCHUP and starts loop-waiting
      41             :  *       until either the table state is set to SYNCDONE or the sync worker
      42             :  *       exits.
      43             :  *     - After the sync worker has seen the state change to CATCHUP, it will
      44             :  *       read the stream and apply changes (acting like an apply worker) until
      45             :  *       it catches up to the specified stream position.  Then it sets the
      46             :  *       state to SYNCDONE.  There might be zero changes applied between
      47             :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
      48             :  *       apply worker.
      49             :  *     - Once the state is set to SYNCDONE, the apply will continue tracking
      50             :  *       the table until it reaches the SYNCDONE stream position, at which
      51             :  *       point it sets state to READY and stops tracking.  Again, there might
      52             :  *       be zero changes in between.
      53             :  *
      54             :  *    So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
      55             :  *    -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
      56             :  *
      57             :  *    The catalog pg_subscription_rel is used to keep information about
      58             :  *    subscribed tables and their state.  The catalog holds all states
      59             :  *    except SYNCWAIT and CATCHUP which are only in shared memory.
      60             :  *
      61             :  *    Example flows look like this:
      62             :  *     - Apply is in front:
      63             :  *        sync:8
      64             :  *          -> set in catalog FINISHEDCOPY
      65             :  *          -> set in memory SYNCWAIT
      66             :  *        apply:10
      67             :  *          -> set in memory CATCHUP
      68             :  *          -> enter wait-loop
      69             :  *        sync:10
      70             :  *          -> set in catalog SYNCDONE
      71             :  *          -> exit
      72             :  *        apply:10
      73             :  *          -> exit wait-loop
      74             :  *          -> continue rep
      75             :  *        apply:11
      76             :  *          -> set in catalog READY
      77             :  *
      78             :  *     - Sync is in front:
      79             :  *        sync:10
      80             :  *          -> set in catalog FINISHEDCOPY
      81             :  *          -> set in memory SYNCWAIT
      82             :  *        apply:8
      83             :  *          -> set in memory CATCHUP
      84             :  *          -> continue per-table filtering
      85             :  *        sync:10
      86             :  *          -> set in catalog SYNCDONE
      87             :  *          -> exit
      88             :  *        apply:10
      89             :  *          -> set in catalog READY
      90             :  *          -> stop per-table filtering
      91             :  *          -> continue rep
      92             :  *-------------------------------------------------------------------------
      93             :  */
      94             : 
      95             : #include "postgres.h"
      96             : 
      97             : #include "access/table.h"
      98             : #include "access/xact.h"
      99             : #include "catalog/indexing.h"
     100             : #include "catalog/pg_subscription_rel.h"
     101             : #include "catalog/pg_type.h"
     102             : #include "commands/copy.h"
     103             : #include "miscadmin.h"
     104             : #include "nodes/makefuncs.h"
     105             : #include "parser/parse_relation.h"
     106             : #include "pgstat.h"
     107             : #include "replication/logicallauncher.h"
     108             : #include "replication/logicalrelation.h"
     109             : #include "replication/logicalworker.h"
     110             : #include "replication/origin.h"
     111             : #include "replication/slot.h"
     112             : #include "replication/walreceiver.h"
     113             : #include "replication/worker_internal.h"
     114             : #include "storage/ipc.h"
     115             : #include "storage/lmgr.h"
     116             : #include "utils/acl.h"
     117             : #include "utils/array.h"
     118             : #include "utils/builtins.h"
     119             : #include "utils/lsyscache.h"
     120             : #include "utils/memutils.h"
     121             : #include "utils/rls.h"
     122             : #include "utils/snapmgr.h"
     123             : #include "utils/syscache.h"
     124             : #include "utils/usercontext.h"
     125             : 
     126             : typedef enum
     127             : {
     128             :     SYNC_TABLE_STATE_NEEDS_REBUILD,
     129             :     SYNC_TABLE_STATE_REBUILD_STARTED,
     130             :     SYNC_TABLE_STATE_VALID,
     131             : } SyncingTablesState;
     132             : 
     133             : static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
     134             : static List *table_states_not_ready = NIL;
     135             : static bool FetchTableStates(bool *started_tx);
     136             : 
     137             : static StringInfo copybuf = NULL;
     138             : 
     139             : /*
     140             :  * Exit routine for synchronization worker.
     141             :  */
     142             : static void
     143             : pg_attribute_noreturn()
     144         320 : finish_sync_worker(void)
     145             : {
     146             :     /*
     147             :      * Commit any outstanding transaction. This is the usual case, unless
     148             :      * there was nothing to do for the table.
     149             :      */
     150         320 :     if (IsTransactionState())
     151             :     {
     152         320 :         CommitTransactionCommand();
     153         320 :         pgstat_report_stat(true);
     154             :     }
     155             : 
     156             :     /* And flush all writes. */
     157         320 :     XLogFlush(GetXLogWriteRecPtr());
     158             : 
     159         320 :     StartTransactionCommand();
     160         320 :     ereport(LOG,
     161             :             (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
     162             :                     MySubscription->name,
     163             :                     get_rel_name(MyLogicalRepWorker->relid))));
     164         320 :     CommitTransactionCommand();
     165             : 
     166             :     /* Find the leader apply worker and signal it. */
     167         320 :     logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
     168             : 
     169             :     /* Stop gracefully */
     170         320 :     proc_exit(0);
     171             : }
     172             : 
     173             : /*
     174             :  * Wait until the relation sync state is set in the catalog to the expected
     175             :  * one; return true when it happens.
     176             :  *
     177             :  * Returns false if the table sync worker or the table itself have
     178             :  * disappeared, or the table state has been reset.
     179             :  *
     180             :  * Currently, this is used in the apply worker when transitioning from
     181             :  * CATCHUP state to SYNCDONE.
     182             :  */
     183             : static bool
     184         632 : wait_for_relation_state_change(Oid relid, char expected_state)
     185             : {
     186             :     char        state;
     187             : 
     188             :     for (;;)
     189         316 :     {
     190             :         LogicalRepWorker *worker;
     191             :         XLogRecPtr  statelsn;
     192             : 
     193         632 :         CHECK_FOR_INTERRUPTS();
     194             : 
     195         632 :         InvalidateCatalogSnapshot();
     196         632 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     197             :                                         relid, &statelsn);
     198             : 
     199         632 :         if (state == SUBREL_STATE_UNKNOWN)
     200           0 :             break;
     201             : 
     202         632 :         if (state == expected_state)
     203         316 :             return true;
     204             : 
     205             :         /* Check if the sync worker is still running and bail if not. */
     206         316 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     207         316 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
     208             :                                         false);
     209         316 :         LWLockRelease(LogicalRepWorkerLock);
     210         316 :         if (!worker)
     211           0 :             break;
     212             : 
     213         316 :         (void) WaitLatch(MyLatch,
     214             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     215             :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     216             : 
     217         316 :         ResetLatch(MyLatch);
     218             :     }
     219             : 
     220           0 :     return false;
     221             : }
     222             : 
     223             : /*
     224             :  * Wait until the apply worker changes the state of our synchronization
     225             :  * worker to the expected one.
     226             :  *
     227             :  * Used when transitioning from SYNCWAIT state to CATCHUP.
     228             :  *
     229             :  * Returns false if the apply worker has disappeared.
     230             :  */
     231             : static bool
     232         642 : wait_for_worker_state_change(char expected_state)
     233             : {
     234             :     int         rc;
     235             : 
     236             :     for (;;)
     237         322 :     {
     238             :         LogicalRepWorker *worker;
     239             : 
     240         642 :         CHECK_FOR_INTERRUPTS();
     241             : 
     242             :         /*
     243             :          * Done if already in correct state.  (We assume this fetch is atomic
     244             :          * enough to not give a misleading answer if we do it with no lock.)
     245             :          */
     246         642 :         if (MyLogicalRepWorker->relstate == expected_state)
     247         320 :             return true;
     248             : 
     249             :         /*
     250             :          * Bail out if the apply worker has died, else signal it we're
     251             :          * waiting.
     252             :          */
     253         322 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     254         322 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     255             :                                         InvalidOid, false);
     256         322 :         if (worker && worker->proc)
     257         322 :             logicalrep_worker_wakeup_ptr(worker);
     258         322 :         LWLockRelease(LogicalRepWorkerLock);
     259         322 :         if (!worker)
     260           0 :             break;
     261             : 
     262             :         /*
     263             :          * Wait.  We expect to get a latch signal back from the apply worker,
     264             :          * but use a timeout in case it dies without sending one.
     265             :          */
     266         322 :         rc = WaitLatch(MyLatch,
     267             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     268             :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     269             : 
     270         322 :         if (rc & WL_LATCH_SET)
     271         322 :             ResetLatch(MyLatch);
     272             :     }
     273             : 
     274           0 :     return false;
     275             : }
     276             : 
     277             : /*
     278             :  * Callback from syscache invalidation.
     279             :  */
     280             : void
     281        2924 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
     282             : {
     283        2924 :     table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
     284        2924 : }
     285             : 
     286             : /*
     287             :  * Handle table synchronization cooperation from the synchronization
     288             :  * worker.
     289             :  *
     290             :  * If the sync worker is in CATCHUP state and reached (or passed) the
     291             :  * predetermined synchronization point in the WAL stream, mark the table as
     292             :  * SYNCDONE and finish.
     293             :  */
     294             : static void
     295         326 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
     296             : {
     297         326 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     298             : 
     299         326 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
     300         326 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
     301             :     {
     302             :         TimeLineID  tli;
     303         320 :         char        syncslotname[NAMEDATALEN] = {0};
     304         320 :         char        originname[NAMEDATALEN] = {0};
     305             : 
     306         320 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
     307         320 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
     308             : 
     309         320 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     310             : 
     311             :         /*
     312             :          * UpdateSubscriptionRelState must be called within a transaction.
     313             :          */
     314         320 :         if (!IsTransactionState())
     315         320 :             StartTransactionCommand();
     316             : 
     317         320 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     318         320 :                                    MyLogicalRepWorker->relid,
     319         320 :                                    MyLogicalRepWorker->relstate,
     320         320 :                                    MyLogicalRepWorker->relstate_lsn);
     321             : 
     322             :         /*
     323             :          * End streaming so that LogRepWorkerWalRcvConn can be used to drop
     324             :          * the slot.
     325             :          */
     326         320 :         walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
     327             : 
     328             :         /*
     329             :          * Cleanup the tablesync slot.
     330             :          *
     331             :          * This has to be done after updating the state because otherwise if
     332             :          * there is an error while doing the database operations we won't be
     333             :          * able to rollback dropped slot.
     334             :          */
     335         320 :         ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
     336         320 :                                         MyLogicalRepWorker->relid,
     337             :                                         syncslotname,
     338             :                                         sizeof(syncslotname));
     339             : 
     340             :         /*
     341             :          * It is important to give an error if we are unable to drop the slot,
     342             :          * otherwise, it won't be dropped till the corresponding subscription
     343             :          * is dropped. So passing missing_ok = false.
     344             :          */
     345         320 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
     346             : 
     347         320 :         CommitTransactionCommand();
     348         320 :         pgstat_report_stat(false);
     349             : 
     350             :         /*
     351             :          * Start a new transaction to clean up the tablesync origin tracking.
     352             :          * This transaction will be ended within the finish_sync_worker().
     353             :          * Now, even, if we fail to remove this here, the apply worker will
     354             :          * ensure to clean it up afterward.
     355             :          *
     356             :          * We need to do this after the table state is set to SYNCDONE.
     357             :          * Otherwise, if an error occurs while performing the database
     358             :          * operation, the worker will be restarted and the in-memory state of
     359             :          * replication progress (remote_lsn) won't be rolled-back which would
     360             :          * have been cleared before restart. So, the restarted worker will use
     361             :          * invalid replication progress state resulting in replay of
     362             :          * transactions that have already been applied.
     363             :          */
     364         320 :         StartTransactionCommand();
     365             : 
     366         320 :         ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
     367         320 :                                            MyLogicalRepWorker->relid,
     368             :                                            originname,
     369             :                                            sizeof(originname));
     370             : 
     371             :         /*
     372             :          * Resetting the origin session removes the ownership of the slot.
     373             :          * This is needed to allow the origin to be dropped.
     374             :          */
     375         320 :         replorigin_session_reset();
     376         320 :         replorigin_session_origin = InvalidRepOriginId;
     377         320 :         replorigin_session_origin_lsn = InvalidXLogRecPtr;
     378         320 :         replorigin_session_origin_timestamp = 0;
     379             : 
     380             :         /*
     381             :          * Drop the tablesync's origin tracking if exists.
     382             :          *
     383             :          * There is a chance that the user is concurrently performing refresh
     384             :          * for the subscription where we remove the table state and its origin
     385             :          * or the apply worker would have removed this origin. So passing
     386             :          * missing_ok = true.
     387             :          */
     388         320 :         replorigin_drop_by_name(originname, true, false);
     389             : 
     390         320 :         finish_sync_worker();
     391             :     }
     392             :     else
     393           6 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     394           6 : }
     395             : 
     396             : /*
     397             :  * Handle table synchronization cooperation from the apply worker.
     398             :  *
     399             :  * Walk over all subscription tables that are individually tracked by the
     400             :  * apply process (currently, all that have state other than
     401             :  * SUBREL_STATE_READY) and manage synchronization for them.
     402             :  *
     403             :  * If there are tables that need synchronizing and are not being synchronized
     404             :  * yet, start sync workers for them (if there are free slots for sync
     405             :  * workers).  To prevent starting the sync worker for the same relation at a
     406             :  * high frequency after a failure, we store its last start time with each sync
     407             :  * state info.  We start the sync worker for the same relation after waiting
     408             :  * at least wal_retrieve_retry_interval.
     409             :  *
     410             :  * For tables that are being synchronized already, check if sync workers
     411             :  * either need action from the apply worker or have finished.  This is the
     412             :  * SYNCWAIT to CATCHUP transition.
     413             :  *
     414             :  * If the synchronization position is reached (SYNCDONE), then the table can
     415             :  * be marked as READY and is no longer tracked.
     416             :  */
     417             : static void
     418        8770 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
     419             : {
     420             :     struct tablesync_start_time_mapping
     421             :     {
     422             :         Oid         relid;
     423             :         TimestampTz last_start_time;
     424             :     };
     425             :     static HTAB *last_start_times = NULL;
     426             :     ListCell   *lc;
     427        8770 :     bool        started_tx = false;
     428        8770 :     bool        should_exit = false;
     429             : 
     430             :     Assert(!IsTransactionState());
     431             : 
     432             :     /* We need up-to-date sync state info for subscription tables here. */
     433        8770 :     FetchTableStates(&started_tx);
     434             : 
     435             :     /*
     436             :      * Prepare a hash table for tracking last start times of workers, to avoid
     437             :      * immediate restarts.  We don't need it if there are no tables that need
     438             :      * syncing.
     439             :      */
     440        8770 :     if (table_states_not_ready != NIL && !last_start_times)
     441         202 :     {
     442             :         HASHCTL     ctl;
     443             : 
     444         202 :         ctl.keysize = sizeof(Oid);
     445         202 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
     446         202 :         last_start_times = hash_create("Logical replication table sync worker start times",
     447             :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
     448             :     }
     449             : 
     450             :     /*
     451             :      * Clean up the hash table when we're done with all tables (just to
     452             :      * release the bit of memory).
     453             :      */
     454        8568 :     else if (table_states_not_ready == NIL && last_start_times)
     455             :     {
     456         150 :         hash_destroy(last_start_times);
     457         150 :         last_start_times = NULL;
     458             :     }
     459             : 
     460             :     /*
     461             :      * Process all tables that are being synchronized.
     462             :      */
     463       11188 :     foreach(lc, table_states_not_ready)
     464             :     {
     465        2418 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
     466             : 
     467        2418 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
     468             :         {
     469             :             /*
     470             :              * Apply has caught up to the position where the table sync has
     471             :              * finished.  Mark the table as ready so that the apply will just
     472             :              * continue to replicate it normally.
     473             :              */
     474         318 :             if (current_lsn >= rstate->lsn)
     475             :             {
     476             :                 char        originname[NAMEDATALEN];
     477             : 
     478         316 :                 rstate->state = SUBREL_STATE_READY;
     479         316 :                 rstate->lsn = current_lsn;
     480         316 :                 if (!started_tx)
     481             :                 {
     482          16 :                     StartTransactionCommand();
     483          16 :                     started_tx = true;
     484             :                 }
     485             : 
     486             :                 /*
     487             :                  * Remove the tablesync origin tracking if exists.
     488             :                  *
     489             :                  * There is a chance that the user is concurrently performing
     490             :                  * refresh for the subscription where we remove the table
     491             :                  * state and its origin or the tablesync worker would have
     492             :                  * already removed this origin. We can't rely on tablesync
     493             :                  * worker to remove the origin tracking as if there is any
     494             :                  * error while dropping we won't restart it to drop the
     495             :                  * origin. So passing missing_ok = true.
     496             :                  */
     497         316 :                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
     498             :                                                    rstate->relid,
     499             :                                                    originname,
     500             :                                                    sizeof(originname));
     501         316 :                 replorigin_drop_by_name(originname, true, false);
     502             : 
     503             :                 /*
     504             :                  * Update the state to READY only after the origin cleanup.
     505             :                  */
     506         316 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     507         316 :                                            rstate->relid, rstate->state,
     508             :                                            rstate->lsn);
     509             :             }
     510             :         }
     511             :         else
     512             :         {
     513             :             LogicalRepWorker *syncworker;
     514             : 
     515             :             /*
     516             :              * Look for a sync worker for this relation.
     517             :              */
     518        2100 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     519             : 
     520        2100 :             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     521             :                                                 rstate->relid, false);
     522             : 
     523        2100 :             if (syncworker)
     524             :             {
     525             :                 /* Found one, update our copy of its state */
     526         938 :                 SpinLockAcquire(&syncworker->relmutex);
     527         938 :                 rstate->state = syncworker->relstate;
     528         938 :                 rstate->lsn = syncworker->relstate_lsn;
     529         938 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     530             :                 {
     531             :                     /*
     532             :                      * Sync worker is waiting for apply.  Tell sync worker it
     533             :                      * can catchup now.
     534             :                      */
     535         316 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
     536         316 :                     syncworker->relstate_lsn =
     537         316 :                         Max(syncworker->relstate_lsn, current_lsn);
     538             :                 }
     539         938 :                 SpinLockRelease(&syncworker->relmutex);
     540             : 
     541             :                 /* If we told worker to catch up, wait for it. */
     542         938 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     543             :                 {
     544             :                     /* Signal the sync worker, as it may be waiting for us. */
     545         316 :                     if (syncworker->proc)
     546         316 :                         logicalrep_worker_wakeup_ptr(syncworker);
     547             : 
     548             :                     /* Now safe to release the LWLock */
     549         316 :                     LWLockRelease(LogicalRepWorkerLock);
     550             : 
     551         316 :                     if (started_tx)
     552             :                     {
     553             :                         /*
     554             :                          * We must commit the existing transaction to release
     555             :                          * the existing locks before entering a busy loop.
     556             :                          * This is required to avoid any undetected deadlocks
     557             :                          * due to any existing lock as deadlock detector won't
     558             :                          * be able to detect the waits on the latch.
     559             :                          */
     560         316 :                         CommitTransactionCommand();
     561         316 :                         pgstat_report_stat(false);
     562             :                     }
     563             : 
     564             :                     /*
     565             :                      * Enter busy loop and wait for synchronization worker to
     566             :                      * reach expected state (or die trying).
     567             :                      */
     568         316 :                     StartTransactionCommand();
     569         316 :                     started_tx = true;
     570             : 
     571         316 :                     wait_for_relation_state_change(rstate->relid,
     572             :                                                    SUBREL_STATE_SYNCDONE);
     573             :                 }
     574             :                 else
     575         622 :                     LWLockRelease(LogicalRepWorkerLock);
     576             :             }
     577             :             else
     578             :             {
     579             :                 /*
     580             :                  * If there is no sync worker for this table yet, count
     581             :                  * running sync workers for this subscription, while we have
     582             :                  * the lock.
     583             :                  */
     584             :                 int         nsyncworkers =
     585        1162 :                     logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     586             : 
     587             :                 /* Now safe to release the LWLock */
     588        1162 :                 LWLockRelease(LogicalRepWorkerLock);
     589             : 
     590             :                 /*
     591             :                  * If there are free sync worker slot(s), start a new sync
     592             :                  * worker for the table.
     593             :                  */
     594        1162 :                 if (nsyncworkers < max_sync_workers_per_subscription)
     595             :                 {
     596         364 :                     TimestampTz now = GetCurrentTimestamp();
     597             :                     struct tablesync_start_time_mapping *hentry;
     598             :                     bool        found;
     599             : 
     600         364 :                     hentry = hash_search(last_start_times, &rstate->relid,
     601             :                                          HASH_ENTER, &found);
     602             : 
     603         402 :                     if (!found ||
     604          38 :                         TimestampDifferenceExceeds(hentry->last_start_time, now,
     605             :                                                    wal_retrieve_retry_interval))
     606             :                     {
     607         338 :                         logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
     608         338 :                                                  MyLogicalRepWorker->dbid,
     609         338 :                                                  MySubscription->oid,
     610         338 :                                                  MySubscription->name,
     611         338 :                                                  MyLogicalRepWorker->userid,
     612             :                                                  rstate->relid,
     613             :                                                  DSM_HANDLE_INVALID);
     614         338 :                         hentry->last_start_time = now;
     615             :                     }
     616             :                 }
     617             :             }
     618             :         }
     619             :     }
     620             : 
     621        8770 :     if (started_tx)
     622             :     {
     623             :         /*
     624             :          * Even when the two_phase mode is requested by the user, it remains
     625             :          * as 'pending' until all tablesyncs have reached READY state.
     626             :          *
     627             :          * When this happens, we restart the apply worker and (if the
     628             :          * conditions are still ok) then the two_phase tri-state will become
     629             :          * 'enabled' at that time.
     630             :          *
     631             :          * Note: If the subscription has no tables then leave the state as
     632             :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
     633             :          * work.
     634             :          */
     635        1408 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
     636             :         {
     637          50 :             CommandCounterIncrement();  /* make updates visible */
     638          50 :             if (AllTablesyncsReady())
     639             :             {
     640          12 :                 ereport(LOG,
     641             :                         (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
     642             :                                 MySubscription->name)));
     643          12 :                 should_exit = true;
     644             :             }
     645             :         }
     646             : 
     647        1408 :         CommitTransactionCommand();
     648        1408 :         pgstat_report_stat(true);
     649             :     }
     650             : 
     651        8770 :     if (should_exit)
     652             :     {
     653             :         /*
     654             :          * Reset the last-start time for this worker so that the launcher will
     655             :          * restart it without waiting for wal_retrieve_retry_interval.
     656             :          */
     657          12 :         ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
     658             : 
     659          12 :         proc_exit(0);
     660             :     }
     661        8758 : }
     662             : 
     663             : /*
     664             :  * Process possible state change(s) of tables that are being synchronized.
     665             :  */
     666             : void
     667        9140 : process_syncing_tables(XLogRecPtr current_lsn)
     668             : {
     669        9140 :     switch (MyLogicalRepWorker->type)
     670             :     {
     671          44 :         case WORKERTYPE_PARALLEL_APPLY:
     672             : 
     673             :             /*
     674             :              * Skip for parallel apply workers because they only operate on
     675             :              * tables that are in a READY state. See pa_can_start() and
     676             :              * should_apply_changes_for_rel().
     677             :              */
     678          44 :             break;
     679             : 
     680         326 :         case WORKERTYPE_TABLESYNC:
     681         326 :             process_syncing_tables_for_sync(current_lsn);
     682           6 :             break;
     683             : 
     684        8770 :         case WORKERTYPE_APPLY:
     685        8770 :             process_syncing_tables_for_apply(current_lsn);
     686        8758 :             break;
     687             : 
     688           0 :         case WORKERTYPE_UNKNOWN:
     689             :             /* Should never happen. */
     690           0 :             elog(ERROR, "Unknown worker type");
     691             :     }
     692        8808 : }
     693             : 
     694             : /*
     695             :  * Create list of columns for COPY based on logical relation mapping.
     696             :  */
     697             : static List *
     698         342 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
     699             : {
     700         342 :     List       *attnamelist = NIL;
     701             :     int         i;
     702             : 
     703         886 :     for (i = 0; i < rel->remoterel.natts; i++)
     704             :     {
     705         544 :         attnamelist = lappend(attnamelist,
     706         544 :                               makeString(rel->remoterel.attnames[i]));
     707             :     }
     708             : 
     709             : 
     710         342 :     return attnamelist;
     711             : }
     712             : 
     713             : /*
     714             :  * Data source callback for the COPY FROM, which reads from the remote
     715             :  * connection and passes the data back to our local COPY.
     716             :  */
     717             : static int
     718       27964 : copy_read_data(void *outbuf, int minread, int maxread)
     719             : {
     720       27964 :     int         bytesread = 0;
     721             :     int         avail;
     722             : 
     723             :     /* If there are some leftover data from previous read, use it. */
     724       27964 :     avail = copybuf->len - copybuf->cursor;
     725       27964 :     if (avail)
     726             :     {
     727           0 :         if (avail > maxread)
     728           0 :             avail = maxread;
     729           0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     730           0 :         copybuf->cursor += avail;
     731           0 :         maxread -= avail;
     732           0 :         bytesread += avail;
     733             :     }
     734             : 
     735       27966 :     while (maxread > 0 && bytesread < minread)
     736             :     {
     737       27966 :         pgsocket    fd = PGINVALID_SOCKET;
     738             :         int         len;
     739       27966 :         char       *buf = NULL;
     740             : 
     741             :         for (;;)
     742             :         {
     743             :             /* Try read the data. */
     744       27966 :             len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
     745             : 
     746       27966 :             CHECK_FOR_INTERRUPTS();
     747             : 
     748       27966 :             if (len == 0)
     749           2 :                 break;
     750       27964 :             else if (len < 0)
     751       27964 :                 return bytesread;
     752             :             else
     753             :             {
     754             :                 /* Process the data */
     755       27626 :                 copybuf->data = buf;
     756       27626 :                 copybuf->len = len;
     757       27626 :                 copybuf->cursor = 0;
     758             : 
     759       27626 :                 avail = copybuf->len - copybuf->cursor;
     760       27626 :                 if (avail > maxread)
     761           0 :                     avail = maxread;
     762       27626 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     763       27626 :                 outbuf = (void *) ((char *) outbuf + avail);
     764       27626 :                 copybuf->cursor += avail;
     765       27626 :                 maxread -= avail;
     766       27626 :                 bytesread += avail;
     767             :             }
     768             : 
     769       27626 :             if (maxread <= 0 || bytesread >= minread)
     770       27626 :                 return bytesread;
     771             :         }
     772             : 
     773             :         /*
     774             :          * Wait for more data or latch.
     775             :          */
     776           2 :         (void) WaitLatchOrSocket(MyLatch,
     777             :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
     778             :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     779             :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
     780             : 
     781           2 :         ResetLatch(MyLatch);
     782             :     }
     783             : 
     784           0 :     return bytesread;
     785             : }
     786             : 
     787             : 
     788             : /*
     789             :  * Get information about remote relation in similar fashion the RELATION
     790             :  * message provides during replication. This function also returns the relation
     791             :  * qualifications to be used in the COPY command.
     792             :  */
     793             : static void
     794         342 : fetch_remote_table_info(char *nspname, char *relname,
     795             :                         LogicalRepRelation *lrel, List **qual)
     796             : {
     797             :     WalRcvExecResult *res;
     798             :     StringInfoData cmd;
     799             :     TupleTableSlot *slot;
     800         342 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
     801         342 :     Oid         attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
     802         342 :     Oid         qualRow[] = {TEXTOID};
     803             :     bool        isnull;
     804             :     int         natt;
     805             :     ListCell   *lc;
     806         342 :     Bitmapset  *included_cols = NULL;
     807             : 
     808         342 :     lrel->nspname = nspname;
     809         342 :     lrel->relname = relname;
     810             : 
     811             :     /* First fetch Oid and replica identity. */
     812         342 :     initStringInfo(&cmd);
     813         342 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
     814             :                      "  FROM pg_catalog.pg_class c"
     815             :                      "  INNER JOIN pg_catalog.pg_namespace n"
     816             :                      "        ON (c.relnamespace = n.oid)"
     817             :                      " WHERE n.nspname = %s"
     818             :                      "   AND c.relname = %s",
     819             :                      quote_literal_cstr(nspname),
     820             :                      quote_literal_cstr(relname));
     821         342 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     822             :                       lengthof(tableRow), tableRow);
     823             : 
     824         342 :     if (res->status != WALRCV_OK_TUPLES)
     825           0 :         ereport(ERROR,
     826             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     827             :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     828             :                         nspname, relname, res->err)));
     829             : 
     830         342 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     831         342 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     832           0 :         ereport(ERROR,
     833             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     834             :                  errmsg("table \"%s.%s\" not found on publisher",
     835             :                         nspname, relname)));
     836             : 
     837         342 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
     838             :     Assert(!isnull);
     839         342 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
     840             :     Assert(!isnull);
     841         342 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
     842             :     Assert(!isnull);
     843             : 
     844         342 :     ExecDropSingleTupleTableSlot(slot);
     845         342 :     walrcv_clear_result(res);
     846             : 
     847             : 
     848             :     /*
     849             :      * Get column lists for each relation.
     850             :      *
     851             :      * We need to do this before fetching info about column names and types,
     852             :      * so that we can skip columns that should not be replicated.
     853             :      */
     854         342 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
     855             :     {
     856             :         WalRcvExecResult *pubres;
     857             :         TupleTableSlot *tslot;
     858         342 :         Oid         attrsRow[] = {INT2VECTOROID};
     859             :         StringInfoData pub_names;
     860             : 
     861         342 :         initStringInfo(&pub_names);
     862        1056 :         foreach(lc, MySubscription->publications)
     863             :         {
     864         714 :             if (foreach_current_index(lc) > 0)
     865         372 :                 appendStringInfoString(&pub_names, ", ");
     866         714 :             appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
     867             :         }
     868             : 
     869             :         /*
     870             :          * Fetch info about column lists for the relation (from all the
     871             :          * publications).
     872             :          */
     873         342 :         resetStringInfo(&cmd);
     874         342 :         appendStringInfo(&cmd,
     875             :                          "SELECT DISTINCT"
     876             :                          "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
     877             :                          "   THEN NULL ELSE gpt.attrs END)"
     878             :                          "  FROM pg_publication p,"
     879             :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
     880             :                          "  pg_class c"
     881             :                          " WHERE gpt.relid = %u AND c.oid = gpt.relid"
     882             :                          "   AND p.pubname IN ( %s )",
     883             :                          lrel->remoteid,
     884             :                          pub_names.data);
     885             : 
     886         342 :         pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     887             :                              lengthof(attrsRow), attrsRow);
     888             : 
     889         342 :         if (pubres->status != WALRCV_OK_TUPLES)
     890           0 :             ereport(ERROR,
     891             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     892             :                      errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
     893             :                             nspname, relname, pubres->err)));
     894             : 
     895             :         /*
     896             :          * We don't support the case where the column list is different for
     897             :          * the same table when combining publications. See comments atop
     898             :          * fetch_table_list. So there should be only one row returned.
     899             :          * Although we already checked this when creating the subscription, we
     900             :          * still need to check here in case the column list was changed after
     901             :          * creating the subscription and before the sync worker is started.
     902             :          */
     903         342 :         if (tuplestore_tuple_count(pubres->tuplestore) > 1)
     904           0 :             ereport(ERROR,
     905             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     906             :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
     907             :                            nspname, relname));
     908             : 
     909             :         /*
     910             :          * Get the column list and build a single bitmap with the attnums.
     911             :          *
     912             :          * If we find a NULL value, it means all the columns should be
     913             :          * replicated.
     914             :          */
     915         342 :         tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
     916         342 :         if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
     917             :         {
     918         342 :             Datum       cfval = slot_getattr(tslot, 1, &isnull);
     919             : 
     920         342 :             if (!isnull)
     921             :             {
     922             :                 ArrayType  *arr;
     923             :                 int         nelems;
     924             :                 int16      *elems;
     925             : 
     926          38 :                 arr = DatumGetArrayTypeP(cfval);
     927          38 :                 nelems = ARR_DIMS(arr)[0];
     928          38 :                 elems = (int16 *) ARR_DATA_PTR(arr);
     929             : 
     930         106 :                 for (natt = 0; natt < nelems; natt++)
     931          68 :                     included_cols = bms_add_member(included_cols, elems[natt]);
     932             :             }
     933             : 
     934         342 :             ExecClearTuple(tslot);
     935             :         }
     936         342 :         ExecDropSingleTupleTableSlot(tslot);
     937             : 
     938         342 :         walrcv_clear_result(pubres);
     939             : 
     940         342 :         pfree(pub_names.data);
     941             :     }
     942             : 
     943             :     /*
     944             :      * Now fetch column names and types.
     945             :      */
     946         342 :     resetStringInfo(&cmd);
     947         342 :     appendStringInfo(&cmd,
     948             :                      "SELECT a.attnum,"
     949             :                      "       a.attname,"
     950             :                      "       a.atttypid,"
     951             :                      "       a.attnum = ANY(i.indkey)"
     952             :                      "  FROM pg_catalog.pg_attribute a"
     953             :                      "  LEFT JOIN pg_catalog.pg_index i"
     954             :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
     955             :                      " WHERE a.attnum > 0::pg_catalog.int2"
     956             :                      "   AND NOT a.attisdropped %s"
     957             :                      "   AND a.attrelid = %u"
     958             :                      " ORDER BY a.attnum",
     959             :                      lrel->remoteid,
     960         342 :                      (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
     961             :                       "AND a.attgenerated = ''" : ""),
     962             :                      lrel->remoteid);
     963         342 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     964             :                       lengthof(attrRow), attrRow);
     965             : 
     966         342 :     if (res->status != WALRCV_OK_TUPLES)
     967           0 :         ereport(ERROR,
     968             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     969             :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     970             :                         nspname, relname, res->err)));
     971             : 
     972             :     /* We don't know the number of rows coming, so allocate enough space. */
     973         342 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
     974         342 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
     975         342 :     lrel->attkeys = NULL;
     976             : 
     977             :     /*
     978             :      * Store the columns as a list of names.  Ignore those that are not
     979             :      * present in the column list, if there is one.
     980             :      */
     981         342 :     natt = 0;
     982         342 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     983         930 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     984             :     {
     985             :         char       *rel_colname;
     986             :         AttrNumber  attnum;
     987             : 
     988         588 :         attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
     989             :         Assert(!isnull);
     990             : 
     991             :         /* If the column is not in the column list, skip it. */
     992         588 :         if (included_cols != NULL && !bms_is_member(attnum, included_cols))
     993             :         {
     994          44 :             ExecClearTuple(slot);
     995          44 :             continue;
     996             :         }
     997             : 
     998         544 :         rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
     999             :         Assert(!isnull);
    1000             : 
    1001         544 :         lrel->attnames[natt] = rel_colname;
    1002         544 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
    1003             :         Assert(!isnull);
    1004             : 
    1005         544 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
    1006         200 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
    1007             : 
    1008             :         /* Should never happen. */
    1009         544 :         if (++natt >= MaxTupleAttributeNumber)
    1010           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
    1011             :                  nspname, relname);
    1012             : 
    1013         544 :         ExecClearTuple(slot);
    1014             :     }
    1015         342 :     ExecDropSingleTupleTableSlot(slot);
    1016             : 
    1017         342 :     lrel->natts = natt;
    1018             : 
    1019         342 :     walrcv_clear_result(res);
    1020             : 
    1021             :     /*
    1022             :      * Get relation's row filter expressions. DISTINCT avoids the same
    1023             :      * expression of a table in multiple publications from being included
    1024             :      * multiple times in the final expression.
    1025             :      *
    1026             :      * We need to copy the row even if it matches just one of the
    1027             :      * publications, so we later combine all the quals with OR.
    1028             :      *
    1029             :      * For initial synchronization, row filtering can be ignored in following
    1030             :      * cases:
    1031             :      *
    1032             :      * 1) one of the subscribed publications for the table hasn't specified
    1033             :      * any row filter
    1034             :      *
    1035             :      * 2) one of the subscribed publications has puballtables set to true
    1036             :      *
    1037             :      * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
    1038             :      * that includes this relation
    1039             :      */
    1040         342 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
    1041             :     {
    1042             :         StringInfoData pub_names;
    1043             : 
    1044             :         /* Build the pubname list. */
    1045         342 :         initStringInfo(&pub_names);
    1046        1398 :         foreach_node(String, pubstr, MySubscription->publications)
    1047             :         {
    1048         714 :             char       *pubname = strVal(pubstr);
    1049             : 
    1050         714 :             if (foreach_current_index(pubstr) > 0)
    1051         372 :                 appendStringInfoString(&pub_names, ", ");
    1052             : 
    1053         714 :             appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
    1054             :         }
    1055             : 
    1056             :         /* Check for row filters. */
    1057         342 :         resetStringInfo(&cmd);
    1058         342 :         appendStringInfo(&cmd,
    1059             :                          "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
    1060             :                          "  FROM pg_publication p,"
    1061             :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt"
    1062             :                          " WHERE gpt.relid = %u"
    1063             :                          "   AND p.pubname IN ( %s )",
    1064             :                          lrel->remoteid,
    1065             :                          pub_names.data);
    1066             : 
    1067         342 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
    1068             : 
    1069         342 :         if (res->status != WALRCV_OK_TUPLES)
    1070           0 :             ereport(ERROR,
    1071             :                     (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
    1072             :                             nspname, relname, res->err)));
    1073             : 
    1074             :         /*
    1075             :          * Multiple row filter expressions for the same table will be combined
    1076             :          * by COPY using OR. If any of the filter expressions for this table
    1077             :          * are null, it means the whole table will be copied. In this case it
    1078             :          * is not necessary to construct a unified row filter expression at
    1079             :          * all.
    1080             :          */
    1081         342 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    1082         370 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    1083             :         {
    1084         350 :             Datum       rf = slot_getattr(slot, 1, &isnull);
    1085             : 
    1086         350 :             if (!isnull)
    1087          28 :                 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
    1088             :             else
    1089             :             {
    1090             :                 /* Ignore filters and cleanup as necessary. */
    1091         322 :                 if (*qual)
    1092             :                 {
    1093           6 :                     list_free_deep(*qual);
    1094           6 :                     *qual = NIL;
    1095             :                 }
    1096         322 :                 break;
    1097             :             }
    1098             : 
    1099          28 :             ExecClearTuple(slot);
    1100             :         }
    1101         342 :         ExecDropSingleTupleTableSlot(slot);
    1102             : 
    1103         342 :         walrcv_clear_result(res);
    1104             :     }
    1105             : 
    1106         342 :     pfree(cmd.data);
    1107         342 : }
    1108             : 
    1109             : /*
    1110             :  * Copy existing data of a table from publisher.
    1111             :  *
    1112             :  * Caller is responsible for locking the local relation.
    1113             :  */
    1114             : static void
    1115         342 : copy_table(Relation rel)
    1116             : {
    1117             :     LogicalRepRelMapEntry *relmapentry;
    1118             :     LogicalRepRelation lrel;
    1119         342 :     List       *qual = NIL;
    1120             :     WalRcvExecResult *res;
    1121             :     StringInfoData cmd;
    1122             :     CopyFromState cstate;
    1123             :     List       *attnamelist;
    1124             :     ParseState *pstate;
    1125         342 :     List       *options = NIL;
    1126             : 
    1127             :     /* Get the publisher relation info. */
    1128         342 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
    1129         342 :                             RelationGetRelationName(rel), &lrel, &qual);
    1130             : 
    1131             :     /* Put the relation into relmap. */
    1132         342 :     logicalrep_relmap_update(&lrel);
    1133             : 
    1134             :     /* Map the publisher relation to local one. */
    1135         342 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
    1136             :     Assert(rel == relmapentry->localrel);
    1137             : 
    1138             :     /* Start copy on the publisher. */
    1139         342 :     initStringInfo(&cmd);
    1140             : 
    1141             :     /* Regular table with no row filter */
    1142         342 :     if (lrel.relkind == RELKIND_RELATION && qual == NIL)
    1143             :     {
    1144         294 :         appendStringInfo(&cmd, "COPY %s",
    1145         294 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
    1146             : 
    1147             :         /* If the table has columns, then specify the columns */
    1148         294 :         if (lrel.natts)
    1149             :         {
    1150         292 :             appendStringInfoString(&cmd, " (");
    1151             : 
    1152             :             /*
    1153             :              * XXX Do we need to list the columns in all cases? Maybe we're
    1154             :              * replicating all columns?
    1155             :              */
    1156         764 :             for (int i = 0; i < lrel.natts; i++)
    1157             :             {
    1158         472 :                 if (i > 0)
    1159         180 :                     appendStringInfoString(&cmd, ", ");
    1160             : 
    1161         472 :                 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
    1162             :             }
    1163             : 
    1164         292 :             appendStringInfoChar(&cmd, ')');
    1165             :         }
    1166             : 
    1167         294 :         appendStringInfoString(&cmd, " TO STDOUT");
    1168             :     }
    1169             :     else
    1170             :     {
    1171             :         /*
    1172             :          * For non-tables and tables with row filters, we need to do COPY
    1173             :          * (SELECT ...), but we can't just do SELECT * because we need to not
    1174             :          * copy generated columns. For tables with any row filters, build a
    1175             :          * SELECT query with OR'ed row filters for COPY.
    1176             :          */
    1177          48 :         appendStringInfoString(&cmd, "COPY (SELECT ");
    1178         120 :         for (int i = 0; i < lrel.natts; i++)
    1179             :         {
    1180          72 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
    1181          72 :             if (i < lrel.natts - 1)
    1182          24 :                 appendStringInfoString(&cmd, ", ");
    1183             :         }
    1184             : 
    1185          48 :         appendStringInfoString(&cmd, " FROM ");
    1186             : 
    1187             :         /*
    1188             :          * For regular tables, make sure we don't copy data from a child that
    1189             :          * inherits the named table as those will be copied separately.
    1190             :          */
    1191          48 :         if (lrel.relkind == RELKIND_RELATION)
    1192          14 :             appendStringInfoString(&cmd, "ONLY ");
    1193             : 
    1194          48 :         appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
    1195             :         /* list of OR'ed filters */
    1196          48 :         if (qual != NIL)
    1197             :         {
    1198             :             ListCell   *lc;
    1199          20 :             char       *q = strVal(linitial(qual));
    1200             : 
    1201          20 :             appendStringInfo(&cmd, " WHERE %s", q);
    1202          22 :             for_each_from(lc, qual, 1)
    1203             :             {
    1204           2 :                 q = strVal(lfirst(lc));
    1205           2 :                 appendStringInfo(&cmd, " OR %s", q);
    1206             :             }
    1207          20 :             list_free_deep(qual);
    1208             :         }
    1209             : 
    1210          48 :         appendStringInfoString(&cmd, ") TO STDOUT");
    1211             :     }
    1212             : 
    1213             :     /*
    1214             :      * Prior to v16, initial table synchronization will use text format even
    1215             :      * if the binary option is enabled for a subscription.
    1216             :      */
    1217         342 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
    1218         342 :         MySubscription->binary)
    1219             :     {
    1220          10 :         appendStringInfoString(&cmd, " WITH (FORMAT binary)");
    1221          10 :         options = list_make1(makeDefElem("format",
    1222             :                                          (Node *) makeString("binary"), -1));
    1223             :     }
    1224             : 
    1225         342 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
    1226         342 :     pfree(cmd.data);
    1227         342 :     if (res->status != WALRCV_OK_COPY_OUT)
    1228           0 :         ereport(ERROR,
    1229             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1230             :                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
    1231             :                         lrel.nspname, lrel.relname, res->err)));
    1232         342 :     walrcv_clear_result(res);
    1233             : 
    1234         342 :     copybuf = makeStringInfo();
    1235             : 
    1236         342 :     pstate = make_parsestate(NULL);
    1237         342 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
    1238             :                                          NULL, false, false);
    1239             : 
    1240         342 :     attnamelist = make_copy_attnamelist(relmapentry);
    1241         342 :     cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
    1242             : 
    1243             :     /* Do the copy */
    1244         340 :     (void) CopyFrom(cstate);
    1245             : 
    1246         320 :     logicalrep_rel_close(relmapentry, NoLock);
    1247         320 : }
    1248             : 
    1249             : /*
    1250             :  * Determine the tablesync slot name.
    1251             :  *
    1252             :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
    1253             :  * on slot name length. We append system_identifier to avoid slot_name
    1254             :  * collision with subscriptions in other clusters. With the current scheme
    1255             :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
    1256             :  * length of slot_name will be 50.
    1257             :  *
    1258             :  * The returned slot name is stored in the supplied buffer (syncslotname) with
    1259             :  * the given size.
    1260             :  *
    1261             :  * Note: We don't use the subscription slot name as part of tablesync slot name
    1262             :  * because we are responsible for cleaning up these slots and it could become
    1263             :  * impossible to recalculate what name to cleanup if the subscription slot name
    1264             :  * had changed.
    1265             :  */
    1266             : void
    1267         666 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
    1268             :                                 char *syncslotname, Size szslot)
    1269             : {
    1270         666 :     snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
    1271             :              relid, GetSystemIdentifier());
    1272         666 : }
    1273             : 
    1274             : /*
    1275             :  * Start syncing the table in the sync worker.
    1276             :  *
    1277             :  * If nothing needs to be done to sync the table, we exit the worker without
    1278             :  * any further action.
    1279             :  *
    1280             :  * The returned slot name is palloc'ed in current memory context.
    1281             :  */
    1282             : static char *
    1283         342 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    1284             : {
    1285             :     char       *slotname;
    1286             :     char       *err;
    1287             :     char        relstate;
    1288             :     XLogRecPtr  relstate_lsn;
    1289             :     Relation    rel;
    1290             :     AclResult   aclresult;
    1291             :     WalRcvExecResult *res;
    1292             :     char        originname[NAMEDATALEN];
    1293             :     RepOriginId originid;
    1294             :     UserContext ucxt;
    1295             :     bool        must_use_password;
    1296             :     bool        run_as_owner;
    1297             : 
    1298             :     /* Check the state of the table synchronization. */
    1299         342 :     StartTransactionCommand();
    1300         342 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
    1301         342 :                                        MyLogicalRepWorker->relid,
    1302             :                                        &relstate_lsn);
    1303         342 :     CommitTransactionCommand();
    1304             : 
    1305             :     /* Is the use of a password mandatory? */
    1306         676 :     must_use_password = MySubscription->passwordrequired &&
    1307         334 :         !MySubscription->ownersuperuser;
    1308             : 
    1309         342 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1310         342 :     MyLogicalRepWorker->relstate = relstate;
    1311         342 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
    1312         342 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1313             : 
    1314             :     /*
    1315             :      * If synchronization is already done or no longer necessary, exit now
    1316             :      * that we've updated shared memory state.
    1317             :      */
    1318         342 :     switch (relstate)
    1319             :     {
    1320           0 :         case SUBREL_STATE_SYNCDONE:
    1321             :         case SUBREL_STATE_READY:
    1322             :         case SUBREL_STATE_UNKNOWN:
    1323           0 :             finish_sync_worker();   /* doesn't return */
    1324             :     }
    1325             : 
    1326             :     /* Calculate the name of the tablesync slot. */
    1327         342 :     slotname = (char *) palloc(NAMEDATALEN);
    1328         342 :     ReplicationSlotNameForTablesync(MySubscription->oid,
    1329         342 :                                     MyLogicalRepWorker->relid,
    1330             :                                     slotname,
    1331             :                                     NAMEDATALEN);
    1332             : 
    1333             :     /*
    1334             :      * Here we use the slot name instead of the subscription name as the
    1335             :      * application_name, so that it is different from the leader apply worker,
    1336             :      * so that synchronous replication can distinguish them.
    1337             :      */
    1338         342 :     LogRepWorkerWalRcvConn =
    1339         342 :         walrcv_connect(MySubscription->conninfo, true, true,
    1340             :                        must_use_password,
    1341             :                        slotname, &err);
    1342         342 :     if (LogRepWorkerWalRcvConn == NULL)
    1343           0 :         ereport(ERROR,
    1344             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1345             :                  errmsg("could not connect to the publisher: %s", err)));
    1346             : 
    1347             :     Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
    1348             :            MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
    1349             :            MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
    1350             : 
    1351             :     /* Assign the origin tracking record name. */
    1352         342 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
    1353         342 :                                        MyLogicalRepWorker->relid,
    1354             :                                        originname,
    1355             :                                        sizeof(originname));
    1356             : 
    1357         342 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
    1358             :     {
    1359             :         /*
    1360             :          * We have previously errored out before finishing the copy so the
    1361             :          * replication slot might exist. We want to remove the slot if it
    1362             :          * already exists and proceed.
    1363             :          *
    1364             :          * XXX We could also instead try to drop the slot, last time we failed
    1365             :          * but for that, we might need to clean up the copy state as it might
    1366             :          * be in the middle of fetching the rows. Also, if there is a network
    1367             :          * breakdown then it wouldn't have succeeded so trying it next time
    1368             :          * seems like a better bet.
    1369             :          */
    1370          18 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
    1371             :     }
    1372         324 :     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
    1373             :     {
    1374             :         /*
    1375             :          * The COPY phase was previously done, but tablesync then crashed
    1376             :          * before it was able to finish normally.
    1377             :          */
    1378           0 :         StartTransactionCommand();
    1379             : 
    1380             :         /*
    1381             :          * The origin tracking name must already exist. It was created first
    1382             :          * time this tablesync was launched.
    1383             :          */
    1384           0 :         originid = replorigin_by_name(originname, false);
    1385           0 :         replorigin_session_setup(originid, 0);
    1386           0 :         replorigin_session_origin = originid;
    1387           0 :         *origin_startpos = replorigin_session_get_progress(false);
    1388             : 
    1389           0 :         CommitTransactionCommand();
    1390             : 
    1391           0 :         goto copy_table_done;
    1392             :     }
    1393             : 
    1394         342 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1395         342 :     MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
    1396         342 :     MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
    1397         342 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1398             : 
    1399             :     /* Update the state and make it visible to others. */
    1400         342 :     StartTransactionCommand();
    1401         342 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1402         342 :                                MyLogicalRepWorker->relid,
    1403         342 :                                MyLogicalRepWorker->relstate,
    1404         342 :                                MyLogicalRepWorker->relstate_lsn);
    1405         342 :     CommitTransactionCommand();
    1406         342 :     pgstat_report_stat(true);
    1407             : 
    1408         342 :     StartTransactionCommand();
    1409             : 
    1410             :     /*
    1411             :      * Use a standard write lock here. It might be better to disallow access
    1412             :      * to the table while it's being synchronized. But we don't want to block
    1413             :      * the main apply process from working and it has to open the relation in
    1414             :      * RowExclusiveLock when remapping remote relation id to local one.
    1415             :      */
    1416         342 :     rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
    1417             : 
    1418             :     /*
    1419             :      * Start a transaction in the remote node in REPEATABLE READ mode.  This
    1420             :      * ensures that both the replication slot we create (see below) and the
    1421             :      * COPY are consistent with each other.
    1422             :      */
    1423         342 :     res = walrcv_exec(LogRepWorkerWalRcvConn,
    1424             :                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
    1425             :                       0, NULL);
    1426         342 :     if (res->status != WALRCV_OK_COMMAND)
    1427           0 :         ereport(ERROR,
    1428             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1429             :                  errmsg("table copy could not start transaction on publisher: %s",
    1430             :                         res->err)));
    1431         342 :     walrcv_clear_result(res);
    1432             : 
    1433             :     /*
    1434             :      * Create a new permanent logical decoding slot. This slot will be used
    1435             :      * for the catchup phase after COPY is done, so tell it to use the
    1436             :      * snapshot to make the final data consistent.
    1437             :      */
    1438         342 :     walrcv_create_slot(LogRepWorkerWalRcvConn,
    1439             :                        slotname, false /* permanent */ , false /* two_phase */ ,
    1440             :                        MySubscription->failover,
    1441             :                        CRS_USE_SNAPSHOT, origin_startpos);
    1442             : 
    1443             :     /*
    1444             :      * Setup replication origin tracking. The purpose of doing this before the
    1445             :      * copy is to avoid doing the copy again due to any error in setting up
    1446             :      * origin tracking.
    1447             :      */
    1448         342 :     originid = replorigin_by_name(originname, true);
    1449         342 :     if (!OidIsValid(originid))
    1450             :     {
    1451             :         /*
    1452             :          * Origin tracking does not exist, so create it now.
    1453             :          *
    1454             :          * Then advance to the LSN got from walrcv_create_slot. This is WAL
    1455             :          * logged for the purpose of recovery. Locks are to prevent the
    1456             :          * replication origin from vanishing while advancing.
    1457             :          */
    1458         342 :         originid = replorigin_create(originname);
    1459             : 
    1460         342 :         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1461         342 :         replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
    1462             :                            true /* go backward */ , true /* WAL log */ );
    1463         342 :         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1464             : 
    1465         342 :         replorigin_session_setup(originid, 0);
    1466         342 :         replorigin_session_origin = originid;
    1467             :     }
    1468             :     else
    1469             :     {
    1470           0 :         ereport(ERROR,
    1471             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1472             :                  errmsg("replication origin \"%s\" already exists",
    1473             :                         originname)));
    1474             :     }
    1475             : 
    1476             :     /*
    1477             :      * Make sure that the copy command runs as the table owner, unless the
    1478             :      * user has opted out of that behaviour.
    1479             :      */
    1480         342 :     run_as_owner = MySubscription->runasowner;
    1481         342 :     if (!run_as_owner)
    1482         340 :         SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
    1483             : 
    1484             :     /*
    1485             :      * Check that our table sync worker has permission to insert into the
    1486             :      * target table.
    1487             :      */
    1488         342 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    1489             :                                   ACL_INSERT);
    1490         342 :     if (aclresult != ACLCHECK_OK)
    1491           0 :         aclcheck_error(aclresult,
    1492           0 :                        get_relkind_objtype(rel->rd_rel->relkind),
    1493           0 :                        RelationGetRelationName(rel));
    1494             : 
    1495             :     /*
    1496             :      * COPY FROM does not honor RLS policies.  That is not a problem for
    1497             :      * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
    1498             :      * who has it implicitly), but other roles should not be able to
    1499             :      * circumvent RLS.  Disallow logical replication into RLS enabled
    1500             :      * relations for such roles.
    1501             :      */
    1502         342 :     if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
    1503           0 :         ereport(ERROR,
    1504             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1505             :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
    1506             :                         GetUserNameFromId(GetUserId(), true),
    1507             :                         RelationGetRelationName(rel))));
    1508             : 
    1509             :     /* Now do the initial data copy */
    1510         342 :     PushActiveSnapshot(GetTransactionSnapshot());
    1511         342 :     copy_table(rel);
    1512         320 :     PopActiveSnapshot();
    1513             : 
    1514         320 :     res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
    1515         320 :     if (res->status != WALRCV_OK_COMMAND)
    1516           0 :         ereport(ERROR,
    1517             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1518             :                  errmsg("table copy could not finish transaction on publisher: %s",
    1519             :                         res->err)));
    1520         320 :     walrcv_clear_result(res);
    1521             : 
    1522         320 :     if (!run_as_owner)
    1523         318 :         RestoreUserContext(&ucxt);
    1524             : 
    1525         320 :     table_close(rel, NoLock);
    1526             : 
    1527             :     /* Make the copy visible. */
    1528         320 :     CommandCounterIncrement();
    1529             : 
    1530             :     /*
    1531             :      * Update the persisted state to indicate the COPY phase is done; make it
    1532             :      * visible to others.
    1533             :      */
    1534         320 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1535         320 :                                MyLogicalRepWorker->relid,
    1536             :                                SUBREL_STATE_FINISHEDCOPY,
    1537         320 :                                MyLogicalRepWorker->relstate_lsn);
    1538             : 
    1539         320 :     CommitTransactionCommand();
    1540             : 
    1541         320 : copy_table_done:
    1542             : 
    1543         320 :     elog(DEBUG1,
    1544             :          "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
    1545             :          originname, LSN_FORMAT_ARGS(*origin_startpos));
    1546             : 
    1547             :     /*
    1548             :      * We are done with the initial data synchronization, update the state.
    1549             :      */
    1550         320 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1551         320 :     MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
    1552         320 :     MyLogicalRepWorker->relstate_lsn = *origin_startpos;
    1553         320 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1554             : 
    1555             :     /*
    1556             :      * Finally, wait until the leader apply worker tells us to catch up and
    1557             :      * then return to let LogicalRepApplyLoop do it.
    1558             :      */
    1559         320 :     wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
    1560         320 :     return slotname;
    1561             : }
    1562             : 
    1563             : /*
    1564             :  * Common code to fetch the up-to-date sync state info into the static lists.
    1565             :  *
    1566             :  * Returns true if subscription has 1 or more tables, else false.
    1567             :  *
    1568             :  * Note: If this function started the transaction (indicated by the parameter)
    1569             :  * then it is the caller's responsibility to commit it.
    1570             :  */
    1571             : static bool
    1572        8896 : FetchTableStates(bool *started_tx)
    1573             : {
    1574             :     static bool has_subrels = false;
    1575             : 
    1576        8896 :     *started_tx = false;
    1577             : 
    1578        8896 :     if (table_states_validity != SYNC_TABLE_STATE_VALID)
    1579             :     {
    1580             :         MemoryContext oldctx;
    1581             :         List       *rstates;
    1582             :         ListCell   *lc;
    1583             :         SubscriptionRelState *rstate;
    1584             : 
    1585        1450 :         table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
    1586             : 
    1587             :         /* Clean the old lists. */
    1588        1450 :         list_free_deep(table_states_not_ready);
    1589        1450 :         table_states_not_ready = NIL;
    1590             : 
    1591        1450 :         if (!IsTransactionState())
    1592             :         {
    1593        1414 :             StartTransactionCommand();
    1594        1414 :             *started_tx = true;
    1595             :         }
    1596             : 
    1597             :         /* Fetch all non-ready tables. */
    1598        1450 :         rstates = GetSubscriptionRelations(MySubscription->oid, true);
    1599             : 
    1600             :         /* Allocate the tracking info in a permanent memory context. */
    1601        1450 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    1602        3686 :         foreach(lc, rstates)
    1603             :         {
    1604        2236 :             rstate = palloc(sizeof(SubscriptionRelState));
    1605        2236 :             memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
    1606        2236 :             table_states_not_ready = lappend(table_states_not_ready, rstate);
    1607             :         }
    1608        1450 :         MemoryContextSwitchTo(oldctx);
    1609             : 
    1610             :         /*
    1611             :          * Does the subscription have tables?
    1612             :          *
    1613             :          * If there were not-READY relations found then we know it does. But
    1614             :          * if table_states_not_ready was empty we still need to check again to
    1615             :          * see if there are 0 tables.
    1616             :          */
    1617        1840 :         has_subrels = (table_states_not_ready != NIL) ||
    1618         390 :             HasSubscriptionRelations(MySubscription->oid);
    1619             : 
    1620             :         /*
    1621             :          * If the subscription relation cache has been invalidated since we
    1622             :          * entered this routine, we still use and return the relations we just
    1623             :          * finished constructing, to avoid infinite loops, but we leave the
    1624             :          * table states marked as stale so that we'll rebuild it again on next
    1625             :          * access. Otherwise, we mark the table states as valid.
    1626             :          */
    1627        1450 :         if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
    1628        1450 :             table_states_validity = SYNC_TABLE_STATE_VALID;
    1629             :     }
    1630             : 
    1631        8896 :     return has_subrels;
    1632             : }
    1633             : 
    1634             : /*
    1635             :  * Execute the initial sync with error handling. Disable the subscription,
    1636             :  * if it's required.
    1637             :  *
    1638             :  * Allocate the slot name in long-lived context on return. Note that we don't
    1639             :  * handle FATAL errors which are probably because of system resource error and
    1640             :  * are not repeatable.
    1641             :  */
    1642             : static void
    1643         342 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
    1644             : {
    1645         342 :     char       *sync_slotname = NULL;
    1646             : 
    1647             :     Assert(am_tablesync_worker());
    1648             : 
    1649         342 :     PG_TRY();
    1650             :     {
    1651             :         /* Call initial sync. */
    1652         342 :         sync_slotname = LogicalRepSyncTableStart(origin_startpos);
    1653             :     }
    1654          22 :     PG_CATCH();
    1655             :     {
    1656          22 :         if (MySubscription->disableonerr)
    1657           2 :             DisableSubscriptionAndExit();
    1658             :         else
    1659             :         {
    1660             :             /*
    1661             :              * Report the worker failed during table synchronization. Abort
    1662             :              * the current transaction so that the stats message is sent in an
    1663             :              * idle state.
    1664             :              */
    1665          20 :             AbortOutOfAnyTransaction();
    1666          20 :             pgstat_report_subscription_error(MySubscription->oid, false);
    1667             : 
    1668          20 :             PG_RE_THROW();
    1669             :         }
    1670             :     }
    1671         320 :     PG_END_TRY();
    1672             : 
    1673             :     /* allocate slot name in long-lived context */
    1674         320 :     *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
    1675         320 :     pfree(sync_slotname);
    1676         320 : }
    1677             : 
    1678             : /*
    1679             :  * Runs the tablesync worker.
    1680             :  *
    1681             :  * It starts syncing tables. After a successful sync, sets streaming options
    1682             :  * and starts streaming to catchup with apply worker.
    1683             :  */
    1684             : static void
    1685         342 : run_tablesync_worker()
    1686             : {
    1687             :     char        originname[NAMEDATALEN];
    1688         342 :     XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
    1689         342 :     char       *slotname = NULL;
    1690             :     WalRcvStreamOptions options;
    1691             : 
    1692         342 :     start_table_sync(&origin_startpos, &slotname);
    1693             : 
    1694         320 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
    1695         320 :                                        MyLogicalRepWorker->relid,
    1696             :                                        originname,
    1697             :                                        sizeof(originname));
    1698             : 
    1699         320 :     set_apply_error_context_origin(originname);
    1700             : 
    1701         320 :     set_stream_options(&options, slotname, &origin_startpos);
    1702             : 
    1703         320 :     walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    1704             : 
    1705             :     /* Apply the changes till we catchup with the apply worker. */
    1706         320 :     start_apply(origin_startpos);
    1707           0 : }
    1708             : 
    1709             : /* Logical Replication Tablesync worker entry point */
    1710             : void
    1711         342 : TablesyncWorkerMain(Datum main_arg)
    1712             : {
    1713         342 :     int         worker_slot = DatumGetInt32(main_arg);
    1714             : 
    1715         342 :     SetupApplyOrSyncWorker(worker_slot);
    1716             : 
    1717         342 :     run_tablesync_worker();
    1718             : 
    1719           0 :     finish_sync_worker();
    1720             : }
    1721             : 
    1722             : /*
    1723             :  * If the subscription has no tables then return false.
    1724             :  *
    1725             :  * Otherwise, are all tablesyncs READY?
    1726             :  *
    1727             :  * Note: This function is not suitable to be called from outside of apply or
    1728             :  * tablesync workers because MySubscription needs to be already initialized.
    1729             :  */
    1730             : bool
    1731         126 : AllTablesyncsReady(void)
    1732             : {
    1733         126 :     bool        started_tx = false;
    1734         126 :     bool        has_subrels = false;
    1735             : 
    1736             :     /* We need up-to-date sync state info for subscription tables here. */
    1737         126 :     has_subrels = FetchTableStates(&started_tx);
    1738             : 
    1739         126 :     if (started_tx)
    1740             :     {
    1741          22 :         CommitTransactionCommand();
    1742          22 :         pgstat_report_stat(true);
    1743             :     }
    1744             : 
    1745             :     /*
    1746             :      * Return false when there are no tables in subscription or not all tables
    1747             :      * are in ready state; true otherwise.
    1748             :      */
    1749         126 :     return has_subrels && (table_states_not_ready == NIL);
    1750             : }
    1751             : 
    1752             : /*
    1753             :  * Update the two_phase state of the specified subscription in pg_subscription.
    1754             :  */
    1755             : void
    1756          10 : UpdateTwoPhaseState(Oid suboid, char new_state)
    1757             : {
    1758             :     Relation    rel;
    1759             :     HeapTuple   tup;
    1760             :     bool        nulls[Natts_pg_subscription];
    1761             :     bool        replaces[Natts_pg_subscription];
    1762             :     Datum       values[Natts_pg_subscription];
    1763             : 
    1764             :     Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
    1765             :            new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
    1766             :            new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
    1767             : 
    1768          10 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1769          10 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
    1770          10 :     if (!HeapTupleIsValid(tup))
    1771           0 :         elog(ERROR,
    1772             :              "cache lookup failed for subscription oid %u",
    1773             :              suboid);
    1774             : 
    1775             :     /* Form a new tuple. */
    1776          10 :     memset(values, 0, sizeof(values));
    1777          10 :     memset(nulls, false, sizeof(nulls));
    1778          10 :     memset(replaces, false, sizeof(replaces));
    1779             : 
    1780             :     /* And update/set two_phase state */
    1781          10 :     values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
    1782          10 :     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1783             : 
    1784          10 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel),
    1785             :                             values, nulls, replaces);
    1786          10 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1787             : 
    1788          10 :     heap_freetuple(tup);
    1789          10 :     table_close(rel, RowExclusiveLock);
    1790          10 : }

Generated by: LCOV version 1.14