LCOV - code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 266 311 85.5 %
Date: 2020-05-25 06:06:29 Functions: 11 12 91.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * tablesync.c
       3             :  *    PostgreSQL logical replication
       4             :  *
       5             :  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/tablesync.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code for initial table data synchronization for
      12             :  *    logical replication.
      13             :  *
      14             :  *    The initial data synchronization is done separately for each table,
      15             :  *    in a separate apply worker that only fetches the initial snapshot data
      16             :  *    from the publisher and then synchronizes the position in the stream with
      17             :  *    the main apply worker.
      18             :  *
      19             :  *    There are several reasons for doing the synchronization this way:
      20             :  *     - It allows us to parallelize the initial data synchronization
      21             :  *       which lowers the time needed for it to happen.
      22             :  *     - The initial synchronization does not have to hold the xid and LSN
      23             :  *       for the time it takes to copy data of all tables, causing less
      24             :  *       bloat and lower disk consumption compared to doing the
      25             :  *       synchronization in a single process for the whole database.
      26             :  *     - It allows us to synchronize any tables added after the initial
      27             :  *       synchronization has finished.
      28             :  *
      29             :  *    The stream position synchronization works in multiple steps.
      30             :  *     - Sync finishes copy and sets worker state as SYNCWAIT and waits for
      31             :  *       state to change in a loop.
      32             :  *     - Apply periodically checks tables that are synchronizing for SYNCWAIT.
      33             :  *       When the desired state appears, it will set the worker state to
      34             :  *       CATCHUP and starts loop-waiting until either the table state is set
      35             :  *       to SYNCDONE or the sync worker exits.
      36             :  *     - After the sync worker has seen the state change to CATCHUP, it will
      37             :  *       read the stream and apply changes (acting like an apply worker) until
      38             :  *       it catches up to the specified stream position.  Then it sets the
      39             :  *       state to SYNCDONE.  There might be zero changes applied between
      40             :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
      41             :  *       apply worker.
      42             :  *     - Once the state was set to SYNCDONE, the apply will continue tracking
      43             :  *       the table until it reaches the SYNCDONE stream position, at which
      44             :  *       point it sets state to READY and stops tracking.  Again, there might
      45             :  *       be zero changes in between.
      46             :  *
      47             :  *    So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
      48             :  *    SYNCDONE -> READY.
      49             :  *
      50             :  *    The catalog pg_subscription_rel is used to keep information about
      51             :  *    subscribed tables and their state.  Some transient state during data
      52             :  *    synchronization is kept in shared memory.  The states SYNCWAIT and
      53             :  *    CATCHUP only appear in memory.
      54             :  *
      55             :  *    Example flows look like this:
      56             :  *     - Apply is in front:
      57             :  *        sync:8
      58             :  *          -> set in memory SYNCWAIT
      59             :  *        apply:10
      60             :  *          -> set in memory CATCHUP
      61             :  *          -> enter wait-loop
      62             :  *        sync:10
      63             :  *          -> set in catalog SYNCDONE
      64             :  *          -> exit
      65             :  *        apply:10
      66             :  *          -> exit wait-loop
      67             :  *          -> continue rep
      68             :  *        apply:11
      69             :  *          -> set in catalog READY
      70             :  *     - Sync in front:
      71             :  *        sync:10
      72             :  *          -> set in memory SYNCWAIT
      73             :  *        apply:8
      74             :  *          -> set in memory CATCHUP
      75             :  *          -> continue per-table filtering
      76             :  *        sync:10
      77             :  *          -> set in catalog SYNCDONE
      78             :  *          -> exit
      79             :  *        apply:10
      80             :  *          -> set in catalog READY
      81             :  *          -> stop per-table filtering
      82             :  *          -> continue rep
      83             :  *-------------------------------------------------------------------------
      84             :  */
      85             : 
      86             : #include "postgres.h"
      87             : 
      88             : #include "access/table.h"
      89             : #include "access/xact.h"
      90             : #include "catalog/pg_subscription_rel.h"
      91             : #include "catalog/pg_type.h"
      92             : #include "commands/copy.h"
      93             : #include "miscadmin.h"
      94             : #include "parser/parse_relation.h"
      95             : #include "pgstat.h"
      96             : #include "replication/logicallauncher.h"
      97             : #include "replication/logicalrelation.h"
      98             : #include "replication/walreceiver.h"
      99             : #include "replication/worker_internal.h"
     100             : #include "storage/ipc.h"
     101             : #include "utils/builtins.h"
     102             : #include "utils/lsyscache.h"
     103             : #include "utils/memutils.h"
     104             : #include "utils/snapmgr.h"
     105             : 
     106             : static bool table_states_valid = false;
     107             : 
     108             : StringInfo  copybuf = NULL;
     109             : 
     110             : /*
     111             :  * Exit routine for synchronization worker.
     112             :  */
     113             : static void
     114             : pg_attribute_noreturn()
     115          98 : finish_sync_worker(void)
     116             : {
     117             :     /*
     118             :      * Commit any outstanding transaction. This is the usual case, unless
     119             :      * there was nothing to do for the table.
     120             :      */
     121          98 :     if (IsTransactionState())
     122             :     {
     123          98 :         CommitTransactionCommand();
     124          98 :         pgstat_report_stat(false);
     125             :     }
     126             : 
     127             :     /* And flush all writes. */
     128          98 :     XLogFlush(GetXLogWriteRecPtr());
     129             : 
     130          98 :     StartTransactionCommand();
     131          98 :     ereport(LOG,
     132             :             (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
     133             :                     MySubscription->name,
     134             :                     get_rel_name(MyLogicalRepWorker->relid))));
     135          98 :     CommitTransactionCommand();
     136             : 
     137             :     /* Find the main apply worker and signal it. */
     138          98 :     logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
     139             : 
     140             :     /* Stop gracefully */
     141          98 :     proc_exit(0);
     142             : }
     143             : 
     144             : /*
     145             :  * Wait until the relation synchronization state is set in the catalog to the
     146             :  * expected one.
     147             :  *
     148             :  * Used when transitioning from CATCHUP state to SYNCDONE.
     149             :  *
     150             :  * Returns false if the synchronization worker has disappeared or the table state
     151             :  * has been reset.
     152             :  */
     153             : static bool
     154         194 : wait_for_relation_state_change(Oid relid, char expected_state)
     155             : {
     156             :     char        state;
     157             : 
     158             :     for (;;)
     159          96 :     {
     160             :         LogicalRepWorker *worker;
     161             :         XLogRecPtr  statelsn;
     162             : 
     163         194 :         CHECK_FOR_INTERRUPTS();
     164             : 
     165             :         /* XXX use cache invalidation here to improve performance? */
     166         194 :         PushActiveSnapshot(GetLatestSnapshot());
     167         194 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     168             :                                         relid, &statelsn, true);
     169         194 :         PopActiveSnapshot();
     170             : 
     171         194 :         if (state == SUBREL_STATE_UNKNOWN)
     172          98 :             return false;
     173             : 
     174         194 :         if (state == expected_state)
     175          96 :             return true;
     176             : 
     177             :         /* Check if the sync worker is still running and bail if not. */
     178          98 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     179             : 
     180             :         /* Check if the opposite worker is still running and bail if not. */
     181          98 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     182          98 :                                         am_tablesync_worker() ? InvalidOid : relid,
     183             :                                         false);
     184          98 :         LWLockRelease(LogicalRepWorkerLock);
     185          98 :         if (!worker)
     186           2 :             return false;
     187             : 
     188          96 :         (void) WaitLatch(MyLatch,
     189             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     190             :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     191             : 
     192          96 :         ResetLatch(MyLatch);
     193             :     }
     194             : 
     195             :     return false;
     196             : }
     197             : 
     198             : /*
     199             :  * Wait until the apply worker changes the state of our synchronization
     200             :  * worker to the expected one.
     201             :  *
     202             :  * Used when transitioning from SYNCWAIT state to CATCHUP.
     203             :  *
     204             :  * Returns false if the apply worker has disappeared.
     205             :  */
     206             : static bool
     207         196 : wait_for_worker_state_change(char expected_state)
     208             : {
     209             :     int         rc;
     210             : 
     211             :     for (;;)
     212          98 :     {
     213             :         LogicalRepWorker *worker;
     214             : 
     215         196 :         CHECK_FOR_INTERRUPTS();
     216             : 
     217             :         /*
     218             :          * Done if already in correct state.  (We assume this fetch is atomic
     219             :          * enough to not give a misleading answer if we do it with no lock.)
     220             :          */
     221         196 :         if (MyLogicalRepWorker->relstate == expected_state)
     222          98 :             return true;
     223             : 
     224             :         /*
     225             :          * Bail out if the apply worker has died, else signal it we're
     226             :          * waiting.
     227             :          */
     228          98 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     229          98 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     230             :                                         InvalidOid, false);
     231          98 :         if (worker && worker->proc)
     232          98 :             logicalrep_worker_wakeup_ptr(worker);
     233          98 :         LWLockRelease(LogicalRepWorkerLock);
     234          98 :         if (!worker)
     235           0 :             break;
     236             : 
     237             :         /*
     238             :          * Wait.  We expect to get a latch signal back from the apply worker,
     239             :          * but use a timeout in case it dies without sending one.
     240             :          */
     241          98 :         rc = WaitLatch(MyLatch,
     242             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     243             :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     244             : 
     245          98 :         if (rc & WL_LATCH_SET)
     246          98 :             ResetLatch(MyLatch);
     247             :     }
     248             : 
     249           0 :     return false;
     250             : }
     251             : 
     252             : /*
     253             :  * Callback from syscache invalidation.
     254             :  */
     255             : void
     256         372 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
     257             : {
     258         372 :     table_states_valid = false;
     259         372 : }
     260             : 
     261             : /*
     262             :  * Handle table synchronization cooperation from the synchronization
     263             :  * worker.
     264             :  *
     265             :  * If the sync worker is in CATCHUP state and reached (or passed) the
     266             :  * predetermined synchronization point in the WAL stream, mark the table as
     267             :  * SYNCDONE and finish.
     268             :  */
     269             : static void
     270           0 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
     271             : {
     272             :     Assert(IsTransactionState());
     273             : 
     274           0 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     275             : 
     276           0 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
     277           0 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
     278             :     {
     279             :         TimeLineID  tli;
     280             : 
     281           0 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
     282           0 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
     283             : 
     284           0 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     285             : 
     286           0 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     287           0 :                                    MyLogicalRepWorker->relid,
     288           0 :                                    MyLogicalRepWorker->relstate,
     289           0 :                                    MyLogicalRepWorker->relstate_lsn);
     290             : 
     291           0 :         walrcv_endstreaming(wrconn, &tli);
     292           0 :         finish_sync_worker();
     293             :     }
     294             :     else
     295           0 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     296           0 : }
     297             : 
     298             : /*
     299             :  * Handle table synchronization cooperation from the apply worker.
     300             :  *
     301             :  * Walk over all subscription tables that are individually tracked by the
     302             :  * apply process (currently, all that have state other than
     303             :  * SUBREL_STATE_READY) and manage synchronization for them.
     304             :  *
     305             :  * If there are tables that need synchronizing and are not being synchronized
     306             :  * yet, start sync workers for them (if there are free slots for sync
     307             :  * workers).  To prevent starting the sync worker for the same relation at a
     308             :  * high frequency after a failure, we store its last start time with each sync
     309             :  * state info.  We start the sync worker for the same relation after waiting
     310             :  * at least wal_retrieve_retry_interval.
     311             :  *
     312             :  * For tables that are being synchronized already, check if sync workers
     313             :  * either need action from the apply worker or have finished.  This is the
     314             :  * SYNCWAIT to CATCHUP transition.
     315             :  *
     316             :  * If the synchronization position is reached (SYNCDONE), then the table can
     317             :  * be marked as READY and is no longer tracked.
     318             :  */
     319             : static void
     320        7596 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
     321             : {
     322             :     struct tablesync_start_time_mapping
     323             :     {
     324             :         Oid         relid;
     325             :         TimestampTz last_start_time;
     326             :     };
     327             :     static List *table_states = NIL;
     328             :     static HTAB *last_start_times = NULL;
     329             :     ListCell   *lc;
     330        7596 :     bool        started_tx = false;
     331             : 
     332             :     Assert(!IsTransactionState());
     333             : 
     334             :     /* We need up-to-date sync state info for subscription tables here. */
     335        7596 :     if (!table_states_valid)
     336             :     {
     337             :         MemoryContext oldctx;
     338             :         List       *rstates;
     339             :         ListCell   *lc;
     340             :         SubscriptionRelState *rstate;
     341             : 
     342             :         /* Clean the old list. */
     343         308 :         list_free_deep(table_states);
     344         308 :         table_states = NIL;
     345             : 
     346         308 :         StartTransactionCommand();
     347         308 :         started_tx = true;
     348             : 
     349             :         /* Fetch all non-ready tables. */
     350         308 :         rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
     351             : 
     352             :         /* Allocate the tracking info in a permanent memory context. */
     353         308 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     354        1100 :         foreach(lc, rstates)
     355             :         {
     356         792 :             rstate = palloc(sizeof(SubscriptionRelState));
     357         792 :             memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
     358         792 :             table_states = lappend(table_states, rstate);
     359             :         }
     360         308 :         MemoryContextSwitchTo(oldctx);
     361             : 
     362         308 :         table_states_valid = true;
     363             :     }
     364             : 
     365             :     /*
     366             :      * Prepare a hash table for tracking last start times of workers, to avoid
     367             :      * immediate restarts.  We don't need it if there are no tables that need
     368             :      * syncing.
     369             :      */
     370        7596 :     if (table_states && !last_start_times)
     371          40 :     {
     372             :         HASHCTL     ctl;
     373             : 
     374          40 :         memset(&ctl, 0, sizeof(ctl));
     375          40 :         ctl.keysize = sizeof(Oid);
     376          40 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
     377          40 :         last_start_times = hash_create("Logical replication table sync worker start times",
     378             :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
     379             :     }
     380             : 
     381             :     /*
     382             :      * Clean up the hash table when we're done with all tables (just to
     383             :      * release the bit of memory).
     384             :      */
     385        7556 :     else if (!table_states && last_start_times)
     386             :     {
     387          36 :         hash_destroy(last_start_times);
     388          36 :         last_start_times = NULL;
     389             :     }
     390             : 
     391             :     /*
     392             :      * Process all tables that are being synchronized.
     393             :      */
     394        8606 :     foreach(lc, table_states)
     395             :     {
     396        1010 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
     397             : 
     398        1010 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
     399             :         {
     400             :             /*
     401             :              * Apply has caught up to the position where the table sync has
     402             :              * finished.  Mark the table as ready so that the apply will just
     403             :              * continue to replicate it normally.
     404             :              */
     405          98 :             if (current_lsn >= rstate->lsn)
     406             :             {
     407          98 :                 rstate->state = SUBREL_STATE_READY;
     408          98 :                 rstate->lsn = current_lsn;
     409          98 :                 if (!started_tx)
     410             :                 {
     411           0 :                     StartTransactionCommand();
     412           0 :                     started_tx = true;
     413             :                 }
     414             : 
     415         196 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     416          98 :                                            rstate->relid, rstate->state,
     417             :                                            rstate->lsn);
     418             :             }
     419             :         }
     420             :         else
     421             :         {
     422             :             LogicalRepWorker *syncworker;
     423             : 
     424             :             /*
     425             :              * Look for a sync worker for this relation.
     426             :              */
     427         912 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     428             : 
     429         912 :             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     430             :                                                 rstate->relid, false);
     431             : 
     432         912 :             if (syncworker)
     433             :             {
     434             :                 /* Found one, update our copy of its state */
     435         280 :                 SpinLockAcquire(&syncworker->relmutex);
     436         280 :                 rstate->state = syncworker->relstate;
     437         280 :                 rstate->lsn = syncworker->relstate_lsn;
     438         280 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     439             :                 {
     440             :                     /*
     441             :                      * Sync worker is waiting for apply.  Tell sync worker it
     442             :                      * can catchup now.
     443             :                      */
     444          98 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
     445          98 :                     syncworker->relstate_lsn =
     446          98 :                         Max(syncworker->relstate_lsn, current_lsn);
     447             :                 }
     448         280 :                 SpinLockRelease(&syncworker->relmutex);
     449             : 
     450             :                 /* If we told worker to catch up, wait for it. */
     451         280 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     452             :                 {
     453             :                     /* Signal the sync worker, as it may be waiting for us. */
     454          98 :                     if (syncworker->proc)
     455          98 :                         logicalrep_worker_wakeup_ptr(syncworker);
     456             : 
     457             :                     /* Now safe to release the LWLock */
     458          98 :                     LWLockRelease(LogicalRepWorkerLock);
     459             : 
     460             :                     /*
     461             :                      * Enter busy loop and wait for synchronization worker to
     462             :                      * reach expected state (or die trying).
     463             :                      */
     464          98 :                     if (!started_tx)
     465             :                     {
     466          52 :                         StartTransactionCommand();
     467          52 :                         started_tx = true;
     468             :                     }
     469             : 
     470          98 :                     wait_for_relation_state_change(rstate->relid,
     471             :                                                    SUBREL_STATE_SYNCDONE);
     472             :                 }
     473             :                 else
     474         182 :                     LWLockRelease(LogicalRepWorkerLock);
     475             :             }
     476             :             else
     477             :             {
     478             :                 /*
     479             :                  * If there is no sync worker for this table yet, count
     480             :                  * running sync workers for this subscription, while we have
     481             :                  * the lock.
     482             :                  */
     483             :                 int         nsyncworkers =
     484         632 :                 logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     485             : 
     486             :                 /* Now safe to release the LWLock */
     487         632 :                 LWLockRelease(LogicalRepWorkerLock);
     488             : 
     489             :                 /*
     490             :                  * If there are free sync worker slot(s), start a new sync
     491             :                  * worker for the table.
     492             :                  */
     493         632 :                 if (nsyncworkers < max_sync_workers_per_subscription)
     494             :                 {
     495         122 :                     TimestampTz now = GetCurrentTimestamp();
     496             :                     struct tablesync_start_time_mapping *hentry;
     497             :                     bool        found;
     498             : 
     499         122 :                     hentry = hash_search(last_start_times, &rstate->relid,
     500             :                                          HASH_ENTER, &found);
     501             : 
     502         146 :                     if (!found ||
     503          24 :                         TimestampDifferenceExceeds(hentry->last_start_time, now,
     504             :                                                    wal_retrieve_retry_interval))
     505             :                     {
     506         324 :                         logicalrep_worker_launch(MyLogicalRepWorker->dbid,
     507         108 :                                                  MySubscription->oid,
     508         108 :                                                  MySubscription->name,
     509         108 :                                                  MyLogicalRepWorker->userid,
     510             :                                                  rstate->relid);
     511         108 :                         hentry->last_start_time = now;
     512             :                     }
     513             :                 }
     514             :             }
     515             :         }
     516             :     }
     517             : 
     518        7596 :     if (started_tx)
     519             :     {
     520         360 :         CommitTransactionCommand();
     521         360 :         pgstat_report_stat(false);
     522             :     }
     523        7596 : }
     524             : 
     525             : /*
     526             :  * Process possible state change(s) of tables that are being synchronized.
     527             :  */
     528             : void
     529        7596 : process_syncing_tables(XLogRecPtr current_lsn)
     530             : {
     531        7596 :     if (am_tablesync_worker())
     532           0 :         process_syncing_tables_for_sync(current_lsn);
     533             :     else
     534        7596 :         process_syncing_tables_for_apply(current_lsn);
     535        7596 : }
     536             : 
     537             : /*
     538             :  * Create list of columns for COPY based on logical relation mapping.
     539             :  */
     540             : static List *
     541         102 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
     542             : {
     543         102 :     List       *attnamelist = NIL;
     544             :     int         i;
     545             : 
     546         280 :     for (i = 0; i < rel->remoterel.natts; i++)
     547             :     {
     548         178 :         attnamelist = lappend(attnamelist,
     549         178 :                               makeString(rel->remoterel.attnames[i]));
     550             :     }
     551             : 
     552             : 
     553         102 :     return attnamelist;
     554             : }
     555             : 
     556             : /*
     557             :  * Data source callback for the COPY FROM, which reads from the remote
     558             :  * connection and passes the data back to our local COPY.
     559             :  */
     560             : static int
     561        2344 : copy_read_data(void *outbuf, int minread, int maxread)
     562             : {
     563        2344 :     int         bytesread = 0;
     564             :     int         avail;
     565             : 
     566             :     /* If there are some leftover data from previous read, use it. */
     567        2344 :     avail = copybuf->len - copybuf->cursor;
     568        2344 :     if (avail)
     569             :     {
     570           0 :         if (avail > maxread)
     571           0 :             avail = maxread;
     572           0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     573           0 :         copybuf->cursor += avail;
     574           0 :         maxread -= avail;
     575           0 :         bytesread += avail;
     576             :     }
     577             : 
     578        2344 :     while (maxread > 0 && bytesread < minread)
     579             :     {
     580        2344 :         pgsocket    fd = PGINVALID_SOCKET;
     581             :         int         len;
     582        2344 :         char       *buf = NULL;
     583             : 
     584             :         for (;;)
     585             :         {
     586             :             /* Try read the data. */
     587        2344 :             len = walrcv_receive(wrconn, &buf, &fd);
     588             : 
     589        2344 :             CHECK_FOR_INTERRUPTS();
     590             : 
     591        2344 :             if (len == 0)
     592           0 :                 break;
     593        2344 :             else if (len < 0)
     594        2344 :                 return bytesread;
     595             :             else
     596             :             {
     597             :                 /* Process the data */
     598        2242 :                 copybuf->data = buf;
     599        2242 :                 copybuf->len = len;
     600        2242 :                 copybuf->cursor = 0;
     601             : 
     602        2242 :                 avail = copybuf->len - copybuf->cursor;
     603        2242 :                 if (avail > maxread)
     604           0 :                     avail = maxread;
     605        2242 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     606        2242 :                 outbuf = (void *) ((char *) outbuf + avail);
     607        2242 :                 copybuf->cursor += avail;
     608        2242 :                 maxread -= avail;
     609        2242 :                 bytesread += avail;
     610             :             }
     611             : 
     612        2242 :             if (maxread <= 0 || bytesread >= minread)
     613        2242 :                 return bytesread;
     614             :         }
     615             : 
     616             :         /*
     617             :          * Wait for more data or latch.
     618             :          */
     619           0 :         (void) WaitLatchOrSocket(MyLatch,
     620             :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
     621             :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     622             :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
     623             : 
     624           0 :         ResetLatch(MyLatch);
     625             :     }
     626             : 
     627           0 :     return bytesread;
     628             : }
     629             : 
     630             : 
     631             : /*
     632             :  * Get information about remote relation in similar fashion the RELATION
     633             :  * message provides during replication.
     634             :  */
     635             : static void
     636         102 : fetch_remote_table_info(char *nspname, char *relname,
     637             :                         LogicalRepRelation *lrel)
     638             : {
     639             :     WalRcvExecResult *res;
     640             :     StringInfoData cmd;
     641             :     TupleTableSlot *slot;
     642         102 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
     643         102 :     Oid         attrRow[] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
     644             :     bool        isnull;
     645             :     int         natt;
     646             : 
     647         102 :     lrel->nspname = nspname;
     648         102 :     lrel->relname = relname;
     649             : 
     650             :     /* First fetch Oid and replica identity. */
     651         102 :     initStringInfo(&cmd);
     652         102 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
     653             :                      "  FROM pg_catalog.pg_class c"
     654             :                      "  INNER JOIN pg_catalog.pg_namespace n"
     655             :                      "        ON (c.relnamespace = n.oid)"
     656             :                      " WHERE n.nspname = %s"
     657             :                      "   AND c.relname = %s",
     658             :                      quote_literal_cstr(nspname),
     659             :                      quote_literal_cstr(relname));
     660         102 :     res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
     661             : 
     662         102 :     if (res->status != WALRCV_OK_TUPLES)
     663           0 :         ereport(ERROR,
     664             :                 (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     665             :                         nspname, relname, res->err)));
     666             : 
     667         102 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     668         102 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     669           0 :         ereport(ERROR,
     670             :                 (errmsg("table \"%s.%s\" not found on publisher",
     671             :                         nspname, relname)));
     672             : 
     673         102 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
     674             :     Assert(!isnull);
     675         102 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
     676             :     Assert(!isnull);
     677         102 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
     678             :     Assert(!isnull);
     679             : 
     680         102 :     ExecDropSingleTupleTableSlot(slot);
     681         102 :     walrcv_clear_result(res);
     682             : 
     683             :     /* Now fetch columns. */
     684         102 :     resetStringInfo(&cmd);
     685         102 :     appendStringInfo(&cmd,
     686             :                      "SELECT a.attname,"
     687             :                      "       a.atttypid,"
     688             :                      "       a.atttypmod,"
     689             :                      "       a.attnum = ANY(i.indkey)"
     690             :                      "  FROM pg_catalog.pg_attribute a"
     691             :                      "  LEFT JOIN pg_catalog.pg_index i"
     692             :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
     693             :                      " WHERE a.attnum > 0::pg_catalog.int2"
     694             :                      "   AND NOT a.attisdropped %s"
     695             :                      "   AND a.attrelid = %u"
     696             :                      " ORDER BY a.attnum",
     697             :                      lrel->remoteid,
     698         102 :                      (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
     699             :                      lrel->remoteid);
     700         102 :     res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
     701             : 
     702         102 :     if (res->status != WALRCV_OK_TUPLES)
     703           0 :         ereport(ERROR,
     704             :                 (errmsg("could not fetch table info for table \"%s.%s\": %s",
     705             :                         nspname, relname, res->err)));
     706             : 
     707             :     /* We don't know the number of rows coming, so allocate enough space. */
     708         102 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
     709         102 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
     710         102 :     lrel->attkeys = NULL;
     711             : 
     712         102 :     natt = 0;
     713         102 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     714         280 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     715             :     {
     716         356 :         lrel->attnames[natt] =
     717         178 :             TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     718             :         Assert(!isnull);
     719         178 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
     720             :         Assert(!isnull);
     721         178 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
     722          82 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
     723             : 
     724             :         /* Should never happen. */
     725         178 :         if (++natt >= MaxTupleAttributeNumber)
     726           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
     727             :                  nspname, relname);
     728             : 
     729         178 :         ExecClearTuple(slot);
     730             :     }
     731         102 :     ExecDropSingleTupleTableSlot(slot);
     732             : 
     733         102 :     lrel->natts = natt;
     734             : 
     735         102 :     walrcv_clear_result(res);
     736         102 :     pfree(cmd.data);
     737         102 : }
     738             : 
     739             : /*
     740             :  * Copy existing data of a table from publisher.
     741             :  *
     742             :  * Caller is responsible for locking the local relation.
     743             :  */
     744             : static void
     745         102 : copy_table(Relation rel)
     746             : {
     747             :     LogicalRepRelMapEntry *relmapentry;
     748             :     LogicalRepRelation lrel;
     749             :     WalRcvExecResult *res;
     750             :     StringInfoData cmd;
     751             :     CopyState   cstate;
     752             :     List       *attnamelist;
     753             :     ParseState *pstate;
     754             : 
     755             :     /* Get the publisher relation info. */
     756         102 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
     757         102 :                             RelationGetRelationName(rel), &lrel);
     758             : 
     759             :     /* Put the relation into relmap. */
     760         102 :     logicalrep_relmap_update(&lrel);
     761             : 
     762             :     /* Map the publisher relation to local one. */
     763         102 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
     764             :     Assert(rel == relmapentry->localrel);
     765             : 
     766             :     /* Start copy on the publisher. */
     767         102 :     initStringInfo(&cmd);
     768         102 :     if (lrel.relkind == RELKIND_RELATION)
     769          94 :         appendStringInfo(&cmd, "COPY %s TO STDOUT",
     770          94 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
     771             :     else
     772             :     {
     773             :         /*
     774             :          * For non-tables, we need to do COPY (SELECT ...), but we can't just
     775             :          * do SELECT * because we need to not copy generated columns.
     776             :          */
     777           8 :         appendStringInfo(&cmd, "COPY (SELECT ");
     778          24 :         for (int i = 0; i < lrel.natts; i++)
     779             :         {
     780          16 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
     781          16 :             if (i < lrel.natts - 1)
     782           8 :                 appendStringInfoString(&cmd, ", ");
     783             :         }
     784           8 :         appendStringInfo(&cmd, " FROM %s) TO STDOUT",
     785           8 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
     786             :     }
     787         102 :     res = walrcv_exec(wrconn, cmd.data, 0, NULL);
     788         102 :     pfree(cmd.data);
     789         102 :     if (res->status != WALRCV_OK_COPY_OUT)
     790           0 :         ereport(ERROR,
     791             :                 (errmsg("could not start initial contents copy for table \"%s.%s\": %s",
     792             :                         lrel.nspname, lrel.relname, res->err)));
     793         102 :     walrcv_clear_result(res);
     794             : 
     795         102 :     copybuf = makeStringInfo();
     796             : 
     797         102 :     pstate = make_parsestate(NULL);
     798         102 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
     799             :                                          NULL, false, false);
     800             : 
     801         102 :     attnamelist = make_copy_attnamelist(relmapentry);
     802         102 :     cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
     803             : 
     804             :     /* Do the copy */
     805         102 :     (void) CopyFrom(cstate);
     806             : 
     807          98 :     logicalrep_rel_close(relmapentry, NoLock);
     808          98 : }
     809             : 
     810             : /*
     811             :  * Start syncing the table in the sync worker.
     812             :  *
     813             :  * The returned slot name is palloc'ed in current memory context.
     814             :  */
     815             : char *
     816         102 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
     817             : {
     818             :     char       *slotname;
     819             :     char       *err;
     820             :     char        relstate;
     821             :     XLogRecPtr  relstate_lsn;
     822             : 
     823             :     /* Check the state of the table synchronization. */
     824         102 :     StartTransactionCommand();
     825         102 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     826         102 :                                        MyLogicalRepWorker->relid,
     827             :                                        &relstate_lsn, true);
     828         102 :     CommitTransactionCommand();
     829             : 
     830         102 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     831         102 :     MyLogicalRepWorker->relstate = relstate;
     832         102 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
     833         102 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
     834             : 
     835             :     /*
     836             :      * To build a slot name for the sync work, we are limited to NAMEDATALEN -
     837             :      * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
     838             :      * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
     839             :      * NAMEDATALEN on the remote that matters, but this scheme will also work
     840             :      * reasonably if that is different.)
     841             :      */
     842             :     StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");  /* for sanity */
     843         306 :     slotname = psprintf("%.*s_%u_sync_%u",
     844             :                         NAMEDATALEN - 28,
     845         102 :                         MySubscription->slotname,
     846         102 :                         MySubscription->oid,
     847         102 :                         MyLogicalRepWorker->relid);
     848             : 
     849             :     /*
     850             :      * Here we use the slot name instead of the subscription name as the
     851             :      * application_name, so that it is different from the main apply worker,
     852             :      * so that synchronous replication can distinguish them.
     853             :      */
     854         102 :     wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
     855         102 :     if (wrconn == NULL)
     856           0 :         ereport(ERROR,
     857             :                 (errmsg("could not connect to the publisher: %s", err)));
     858             : 
     859         102 :     switch (MyLogicalRepWorker->relstate)
     860             :     {
     861         102 :         case SUBREL_STATE_INIT:
     862             :         case SUBREL_STATE_DATASYNC:
     863             :             {
     864             :                 Relation    rel;
     865             :                 WalRcvExecResult *res;
     866             : 
     867         102 :                 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     868         102 :                 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
     869         102 :                 MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
     870         102 :                 SpinLockRelease(&MyLogicalRepWorker->relmutex);
     871             : 
     872             :                 /* Update the state and make it visible to others. */
     873         102 :                 StartTransactionCommand();
     874         306 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     875         102 :                                            MyLogicalRepWorker->relid,
     876         102 :                                            MyLogicalRepWorker->relstate,
     877         102 :                                            MyLogicalRepWorker->relstate_lsn);
     878         102 :                 CommitTransactionCommand();
     879         102 :                 pgstat_report_stat(false);
     880             : 
     881             :                 /*
     882             :                  * We want to do the table data sync in a single transaction.
     883             :                  */
     884         102 :                 StartTransactionCommand();
     885             : 
     886             :                 /*
     887             :                  * Use a standard write lock here. It might be better to
     888             :                  * disallow access to the table while it's being synchronized.
     889             :                  * But we don't want to block the main apply process from
     890             :                  * working and it has to open the relation in RowExclusiveLock
     891             :                  * when remapping remote relation id to local one.
     892             :                  */
     893         102 :                 rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
     894             : 
     895             :                 /*
     896             :                  * Create a temporary slot for the sync process. We do this
     897             :                  * inside the transaction so that we can use the snapshot made
     898             :                  * by the slot to get existing data.
     899             :                  */
     900         102 :                 res = walrcv_exec(wrconn,
     901             :                                   "BEGIN READ ONLY ISOLATION LEVEL "
     902             :                                   "REPEATABLE READ", 0, NULL);
     903         102 :                 if (res->status != WALRCV_OK_COMMAND)
     904           0 :                     ereport(ERROR,
     905             :                             (errmsg("table copy could not start transaction on publisher"),
     906             :                              errdetail("The error was: %s", res->err)));
     907         102 :                 walrcv_clear_result(res);
     908             : 
     909             :                 /*
     910             :                  * Create new temporary logical decoding slot.
     911             :                  *
     912             :                  * We'll use slot for data copy so make sure the snapshot is
     913             :                  * used for the transaction; that way the COPY will get data
     914             :                  * that is consistent with the lsn used by the slot to start
     915             :                  * decoding.
     916             :                  */
     917         102 :                 walrcv_create_slot(wrconn, slotname, true,
     918             :                                    CRS_USE_SNAPSHOT, origin_startpos);
     919             : 
     920         102 :                 PushActiveSnapshot(GetTransactionSnapshot());
     921         102 :                 copy_table(rel);
     922          98 :                 PopActiveSnapshot();
     923             : 
     924          98 :                 res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
     925          98 :                 if (res->status != WALRCV_OK_COMMAND)
     926           0 :                     ereport(ERROR,
     927             :                             (errmsg("table copy could not finish transaction on publisher"),
     928             :                              errdetail("The error was: %s", res->err)));
     929          98 :                 walrcv_clear_result(res);
     930             : 
     931          98 :                 table_close(rel, NoLock);
     932             : 
     933             :                 /* Make the copy visible. */
     934          98 :                 CommandCounterIncrement();
     935             : 
     936             :                 /*
     937             :                  * We are done with the initial data synchronization, update
     938             :                  * the state.
     939             :                  */
     940          98 :                 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     941          98 :                 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
     942          98 :                 MyLogicalRepWorker->relstate_lsn = *origin_startpos;
     943          98 :                 SpinLockRelease(&MyLogicalRepWorker->relmutex);
     944             : 
     945             :                 /* Wait for main apply worker to tell us to catchup. */
     946          98 :                 wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
     947             : 
     948             :                 /*----------
     949             :                  * There are now two possible states here:
     950             :                  * a) Sync is behind the apply.  If that's the case we need to
     951             :                  *    catch up with it by consuming the logical replication
     952             :                  *    stream up to the relstate_lsn.  For that, we exit this
     953             :                  *    function and continue in ApplyWorkerMain().
     954             :                  * b) Sync is caught up with the apply.  So it can just set
     955             :                  *    the state to SYNCDONE and finish.
     956             :                  *----------
     957             :                  */
     958          98 :                 if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
     959             :                 {
     960             :                     /*
     961             :                      * Update the new state in catalog.  No need to bother
     962             :                      * with the shmem state as we are exiting for good.
     963             :                      */
     964         196 :                     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     965          98 :                                                MyLogicalRepWorker->relid,
     966             :                                                SUBREL_STATE_SYNCDONE,
     967             :                                                *origin_startpos);
     968          98 :                     finish_sync_worker();
     969             :                 }
     970           0 :                 break;
     971             :             }
     972           0 :         case SUBREL_STATE_SYNCDONE:
     973             :         case SUBREL_STATE_READY:
     974             :         case SUBREL_STATE_UNKNOWN:
     975             : 
     976             :             /*
     977             :              * Nothing to do here but finish.  (UNKNOWN means the relation was
     978             :              * removed from pg_subscription_rel before the sync worker could
     979             :              * start.)
     980             :              */
     981           0 :             finish_sync_worker();
     982             :             break;
     983           0 :         default:
     984           0 :             elog(ERROR, "unknown relation state \"%c\"",
     985             :                  MyLogicalRepWorker->relstate);
     986             :     }
     987             : 
     988           0 :     return slotname;
     989             : }

Generated by: LCOV version 1.13