LCOV - code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 260 300 86.7 %
Date: 2019-11-21 14:06:36 Functions: 11 12 91.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * tablesync.c
       3             :  *    PostgreSQL logical replication
       4             :  *
       5             :  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/tablesync.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code for initial table data synchronization for
      12             :  *    logical replication.
      13             :  *
      14             :  *    The initial data synchronization is done separately for each table,
      15             :  *    in a separate apply worker that only fetches the initial snapshot data
      16             :  *    from the publisher and then synchronizes the position in the stream with
      17             :  *    the main apply worker.
      18             :  *
      19             :  *    There are several reasons for doing the synchronization this way:
      20             :  *     - It allows us to parallelize the initial data synchronization
      21             :  *       which lowers the time needed for it to happen.
      22             :  *     - The initial synchronization does not have to hold the xid and LSN
      23             :  *       for the time it takes to copy data of all tables, causing less
      24             :  *       bloat and lower disk consumption compared to doing the
      25             :  *       synchronization in a single process for the whole database.
      26             :  *     - It allows us to synchronize any tables added after the initial
      27             :  *       synchronization has finished.
      28             :  *
      29             :  *    The stream position synchronization works in multiple steps.
      30             :  *     - Sync finishes copy and sets worker state as SYNCWAIT and waits for
      31             :  *       state to change in a loop.
      32             :  *     - Apply periodically checks tables that are synchronizing for SYNCWAIT.
      33             :  *       When the desired state appears, it will set the worker state to
      34             :  *       CATCHUP and starts loop-waiting until either the table state is set
      35             :  *       to SYNCDONE or the sync worker exits.
      36             :  *     - After the sync worker has seen the state change to CATCHUP, it will
      37             :  *       read the stream and apply changes (acting like an apply worker) until
      38             :  *       it catches up to the specified stream position.  Then it sets the
      39             :  *       state to SYNCDONE.  There might be zero changes applied between
      40             :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
      41             :  *       apply worker.
      42             :  *     - Once the state was set to SYNCDONE, the apply will continue tracking
      43             :  *       the table until it reaches the SYNCDONE stream position, at which
      44             :  *       point it sets state to READY and stops tracking.  Again, there might
      45             :  *       be zero changes in between.
      46             :  *
      47             :  *    So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
      48             :  *    SYNCDONE -> READY.
      49             :  *
      50             :  *    The catalog pg_subscription_rel is used to keep information about
      51             :  *    subscribed tables and their state.  Some transient state during data
      52             :  *    synchronization is kept in shared memory.  The states SYNCWAIT and
      53             :  *    CATCHUP only appear in memory.
      54             :  *
      55             :  *    Example flows look like this:
      56             :  *     - Apply is in front:
      57             :  *        sync:8
      58             :  *          -> set in memory SYNCWAIT
      59             :  *        apply:10
      60             :  *          -> set in memory CATCHUP
      61             :  *          -> enter wait-loop
      62             :  *        sync:10
      63             :  *          -> set in catalog SYNCDONE
      64             :  *          -> exit
      65             :  *        apply:10
      66             :  *          -> exit wait-loop
      67             :  *          -> continue rep
      68             :  *        apply:11
      69             :  *          -> set in catalog READY
      70             :  *     - Sync in front:
      71             :  *        sync:10
      72             :  *          -> set in memory SYNCWAIT
      73             :  *        apply:8
      74             :  *          -> set in memory CATCHUP
      75             :  *          -> continue per-table filtering
      76             :  *        sync:10
      77             :  *          -> set in catalog SYNCDONE
      78             :  *          -> exit
      79             :  *        apply:10
      80             :  *          -> set in catalog READY
      81             :  *          -> stop per-table filtering
      82             :  *          -> continue rep
      83             :  *-------------------------------------------------------------------------
      84             :  */
      85             : 
      86             : #include "postgres.h"
      87             : 
      88             : #include "access/table.h"
      89             : #include "access/xact.h"
      90             : #include "catalog/pg_subscription_rel.h"
      91             : #include "catalog/pg_type.h"
      92             : #include "commands/copy.h"
      93             : #include "miscadmin.h"
      94             : #include "parser/parse_relation.h"
      95             : #include "pgstat.h"
      96             : #include "replication/logicallauncher.h"
      97             : #include "replication/logicalrelation.h"
      98             : #include "replication/walreceiver.h"
      99             : #include "replication/worker_internal.h"
     100             : #include "storage/ipc.h"
     101             : #include "utils/builtins.h"
     102             : #include "utils/lsyscache.h"
     103             : #include "utils/memutils.h"
     104             : #include "utils/snapmgr.h"
     105             : 
     106             : static bool table_states_valid = false;
     107             : 
     108             : StringInfo  copybuf = NULL;
     109             : 
     110             : /*
     111             :  * Exit routine for synchronization worker.
     112             :  */
     113             : static void
     114             : pg_attribute_noreturn()
     115          72 : finish_sync_worker(void)
     116             : {
     117             :     /*
     118             :      * Commit any outstanding transaction. This is the usual case, unless
     119             :      * there was nothing to do for the table.
     120             :      */
     121          72 :     if (IsTransactionState())
     122             :     {
     123          72 :         CommitTransactionCommand();
     124          72 :         pgstat_report_stat(false);
     125             :     }
     126             : 
     127             :     /* And flush all writes. */
     128          72 :     XLogFlush(GetXLogWriteRecPtr());
     129             : 
     130          72 :     StartTransactionCommand();
     131          72 :     ereport(LOG,
     132             :             (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
     133             :                     MySubscription->name,
     134             :                     get_rel_name(MyLogicalRepWorker->relid))));
     135          72 :     CommitTransactionCommand();
     136             : 
     137             :     /* Find the main apply worker and signal it. */
     138          72 :     logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
     139             : 
     140             :     /* Stop gracefully */
     141          72 :     proc_exit(0);
     142             : }
     143             : 
     144             : /*
     145             :  * Wait until the relation synchronization state is set in the catalog to the
     146             :  * expected one.
     147             :  *
     148             :  * Used when transitioning from CATCHUP state to SYNCDONE.
     149             :  *
     150             :  * Returns false if the synchronization worker has disappeared or the table state
     151             :  * has been reset.
     152             :  */
     153             : static bool
     154         142 : wait_for_relation_state_change(Oid relid, char expected_state)
     155             : {
     156             :     char        state;
     157             : 
     158             :     for (;;)
     159          70 :     {
     160             :         LogicalRepWorker *worker;
     161             :         XLogRecPtr  statelsn;
     162             : 
     163         142 :         CHECK_FOR_INTERRUPTS();
     164             : 
     165             :         /* XXX use cache invalidation here to improve performance? */
     166         142 :         PushActiveSnapshot(GetLatestSnapshot());
     167         142 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     168             :                                         relid, &statelsn, true);
     169         142 :         PopActiveSnapshot();
     170             : 
     171         142 :         if (state == SUBREL_STATE_UNKNOWN)
     172          72 :             return false;
     173             : 
     174         142 :         if (state == expected_state)
     175          70 :             return true;
     176             : 
     177             :         /* Check if the sync worker is still running and bail if not. */
     178          72 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     179             : 
     180             :         /* Check if the opposite worker is still running and bail if not. */
     181          72 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     182          72 :                                         am_tablesync_worker() ? InvalidOid : relid,
     183             :                                         false);
     184          72 :         LWLockRelease(LogicalRepWorkerLock);
     185          72 :         if (!worker)
     186           2 :             return false;
     187             : 
     188          70 :         (void) WaitLatch(MyLatch,
     189             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     190             :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     191             : 
     192          70 :         ResetLatch(MyLatch);
     193             :     }
     194             : 
     195             :     return false;
     196             : }
     197             : 
     198             : /*
     199             :  * Wait until the apply worker changes the state of our synchronization
     200             :  * worker to the expected one.
     201             :  *
     202             :  * Used when transitioning from SYNCWAIT state to CATCHUP.
     203             :  *
     204             :  * Returns false if the apply worker has disappeared.
     205             :  */
     206             : static bool
     207         144 : wait_for_worker_state_change(char expected_state)
     208             : {
     209             :     int         rc;
     210             : 
     211             :     for (;;)
     212          72 :     {
     213             :         LogicalRepWorker *worker;
     214             : 
     215         144 :         CHECK_FOR_INTERRUPTS();
     216             : 
     217             :         /*
     218             :          * Done if already in correct state.  (We assume this fetch is atomic
     219             :          * enough to not give a misleading answer if we do it with no lock.)
     220             :          */
     221         144 :         if (MyLogicalRepWorker->relstate == expected_state)
     222          72 :             return true;
     223             : 
     224             :         /*
     225             :          * Bail out if the apply worker has died, else signal it we're
     226             :          * waiting.
     227             :          */
     228          72 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     229          72 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     230             :                                         InvalidOid, false);
     231          72 :         if (worker && worker->proc)
     232          72 :             logicalrep_worker_wakeup_ptr(worker);
     233          72 :         LWLockRelease(LogicalRepWorkerLock);
     234          72 :         if (!worker)
     235           0 :             break;
     236             : 
     237             :         /*
     238             :          * Wait.  We expect to get a latch signal back from the apply worker,
     239             :          * but use a timeout in case it dies without sending one.
     240             :          */
     241          72 :         rc = WaitLatch(MyLatch,
     242             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     243             :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     244             : 
     245          72 :         if (rc & WL_LATCH_SET)
     246          72 :             ResetLatch(MyLatch);
     247             :     }
     248             : 
     249           0 :     return false;
     250             : }
     251             : 
     252             : /*
     253             :  * Callback from syscache invalidation.
     254             :  */
     255             : void
     256         278 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
     257             : {
     258         278 :     table_states_valid = false;
     259         278 : }
     260             : 
     261             : /*
     262             :  * Handle table synchronization cooperation from the synchronization
     263             :  * worker.
     264             :  *
     265             :  * If the sync worker is in CATCHUP state and reached (or passed) the
     266             :  * predetermined synchronization point in the WAL stream, mark the table as
     267             :  * SYNCDONE and finish.
     268             :  */
     269             : static void
     270           0 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
     271             : {
     272             :     Assert(IsTransactionState());
     273             : 
     274           0 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     275             : 
     276           0 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
     277           0 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
     278             :     {
     279             :         TimeLineID  tli;
     280             : 
     281           0 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
     282           0 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
     283             : 
     284           0 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     285             : 
     286           0 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     287           0 :                                    MyLogicalRepWorker->relid,
     288           0 :                                    MyLogicalRepWorker->relstate,
     289           0 :                                    MyLogicalRepWorker->relstate_lsn);
     290             : 
     291           0 :         walrcv_endstreaming(wrconn, &tli);
     292           0 :         finish_sync_worker();
     293             :     }
     294             :     else
     295           0 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     296           0 : }
     297             : 
     298             : /*
     299             :  * Handle table synchronization cooperation from the apply worker.
     300             :  *
     301             :  * Walk over all subscription tables that are individually tracked by the
     302             :  * apply process (currently, all that have state other than
     303             :  * SUBREL_STATE_READY) and manage synchronization for them.
     304             :  *
     305             :  * If there are tables that need synchronizing and are not being synchronized
     306             :  * yet, start sync workers for them (if there are free slots for sync
     307             :  * workers).  To prevent starting the sync worker for the same relation at a
     308             :  * high frequency after a failure, we store its last start time with each sync
     309             :  * state info.  We start the sync worker for the same relation after waiting
     310             :  * at least wal_retrieve_retry_interval.
     311             :  *
     312             :  * For tables that are being synchronized already, check if sync workers
     313             :  * either need action from the apply worker or have finished.  This is the
     314             :  * SYNCWAIT to CATCHUP transition.
     315             :  *
     316             :  * If the synchronization position is reached (SYNCDONE), then the table can
     317             :  * be marked as READY and is no longer tracked.
     318             :  */
     319             : static void
     320        6934 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
     321             : {
     322             :     struct tablesync_start_time_mapping
     323             :     {
     324             :         Oid         relid;
     325             :         TimestampTz last_start_time;
     326             :     };
     327             :     static List *table_states = NIL;
     328             :     static HTAB *last_start_times = NULL;
     329             :     ListCell   *lc;
     330        6934 :     bool        started_tx = false;
     331             : 
     332             :     Assert(!IsTransactionState());
     333             : 
     334             :     /* We need up-to-date sync state info for subscription tables here. */
     335        6934 :     if (!table_states_valid)
     336             :     {
     337             :         MemoryContext oldctx;
     338             :         List       *rstates;
     339             :         ListCell   *lc;
     340             :         SubscriptionRelState *rstate;
     341             : 
     342             :         /* Clean the old list. */
     343         238 :         list_free_deep(table_states);
     344         238 :         table_states = NIL;
     345             : 
     346         238 :         StartTransactionCommand();
     347         238 :         started_tx = true;
     348             : 
     349             :         /* Fetch all non-ready tables. */
     350         238 :         rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
     351             : 
     352             :         /* Allocate the tracking info in a permanent memory context. */
     353         238 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     354         818 :         foreach(lc, rstates)
     355             :         {
     356         580 :             rstate = palloc(sizeof(SubscriptionRelState));
     357         580 :             memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
     358         580 :             table_states = lappend(table_states, rstate);
     359             :         }
     360         238 :         MemoryContextSwitchTo(oldctx);
     361             : 
     362         238 :         table_states_valid = true;
     363             :     }
     364             : 
     365             :     /*
     366             :      * Prepare a hash table for tracking last start times of workers, to avoid
     367             :      * immediate restarts.  We don't need it if there are no tables that need
     368             :      * syncing.
     369             :      */
     370        6934 :     if (table_states && !last_start_times)
     371          32 :     {
     372             :         HASHCTL     ctl;
     373             : 
     374          32 :         memset(&ctl, 0, sizeof(ctl));
     375          32 :         ctl.keysize = sizeof(Oid);
     376          32 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
     377          32 :         last_start_times = hash_create("Logical replication table sync worker start times",
     378             :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
     379             :     }
     380             : 
     381             :     /*
     382             :      * Clean up the hash table when we're done with all tables (just to
     383             :      * release the bit of memory).
     384             :      */
     385        6902 :     else if (!table_states && last_start_times)
     386             :     {
     387          28 :         hash_destroy(last_start_times);
     388          28 :         last_start_times = NULL;
     389             :     }
     390             : 
     391             :     /*
     392             :      * Process all tables that are being synchronized.
     393             :      */
     394        7630 :     foreach(lc, table_states)
     395             :     {
     396         696 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
     397             : 
     398         696 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
     399             :         {
     400             :             /*
     401             :              * Apply has caught up to the position where the table sync has
     402             :              * finished.  Mark the table as ready so that the apply will just
     403             :              * continue to replicate it normally.
     404             :              */
     405          72 :             if (current_lsn >= rstate->lsn)
     406             :             {
     407          72 :                 rstate->state = SUBREL_STATE_READY;
     408          72 :                 rstate->lsn = current_lsn;
     409          72 :                 if (!started_tx)
     410             :                 {
     411           0 :                     StartTransactionCommand();
     412           0 :                     started_tx = true;
     413             :                 }
     414             : 
     415         144 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     416          72 :                                            rstate->relid, rstate->state,
     417             :                                            rstate->lsn);
     418             :             }
     419             :         }
     420             :         else
     421             :         {
     422             :             LogicalRepWorker *syncworker;
     423             : 
     424             :             /*
     425             :              * Look for a sync worker for this relation.
     426             :              */
     427         624 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     428             : 
     429         624 :             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     430             :                                                 rstate->relid, false);
     431             : 
     432         624 :             if (syncworker)
     433             :             {
     434             :                 /* Found one, update our copy of its state */
     435         186 :                 SpinLockAcquire(&syncworker->relmutex);
     436         186 :                 rstate->state = syncworker->relstate;
     437         186 :                 rstate->lsn = syncworker->relstate_lsn;
     438         186 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     439             :                 {
     440             :                     /*
     441             :                      * Sync worker is waiting for apply.  Tell sync worker it
     442             :                      * can catchup now.
     443             :                      */
     444          72 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
     445          72 :                     syncworker->relstate_lsn =
     446          72 :                         Max(syncworker->relstate_lsn, current_lsn);
     447             :                 }
     448         186 :                 SpinLockRelease(&syncworker->relmutex);
     449             : 
     450             :                 /* If we told worker to catch up, wait for it. */
     451         186 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     452             :                 {
     453             :                     /* Signal the sync worker, as it may be waiting for us. */
     454          72 :                     if (syncworker->proc)
     455          72 :                         logicalrep_worker_wakeup_ptr(syncworker);
     456             : 
     457             :                     /* Now safe to release the LWLock */
     458          72 :                     LWLockRelease(LogicalRepWorkerLock);
     459             : 
     460             :                     /*
     461             :                      * Enter busy loop and wait for synchronization worker to
     462             :                      * reach expected state (or die trying).
     463             :                      */
     464          72 :                     if (!started_tx)
     465             :                     {
     466          38 :                         StartTransactionCommand();
     467          38 :                         started_tx = true;
     468             :                     }
     469             : 
     470          72 :                     wait_for_relation_state_change(rstate->relid,
     471             :                                                    SUBREL_STATE_SYNCDONE);
     472             :                 }
     473             :                 else
     474         114 :                     LWLockRelease(LogicalRepWorkerLock);
     475             :             }
     476             :             else
     477             :             {
     478             :                 /*
     479             :                  * If there is no sync worker for this table yet, count
     480             :                  * running sync workers for this subscription, while we have
     481             :                  * the lock.
     482             :                  */
     483         438 :                 int         nsyncworkers =
     484         438 :                 logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     485             : 
     486             :                 /* Now safe to release the LWLock */
     487         438 :                 LWLockRelease(LogicalRepWorkerLock);
     488             : 
     489             :                 /*
     490             :                  * If there are free sync worker slot(s), start a new sync
     491             :                  * worker for the table.
     492             :                  */
     493         438 :                 if (nsyncworkers < max_sync_workers_per_subscription)
     494             :                 {
     495          96 :                     TimestampTz now = GetCurrentTimestamp();
     496             :                     struct tablesync_start_time_mapping *hentry;
     497             :                     bool        found;
     498             : 
     499          96 :                     hentry = hash_search(last_start_times, &rstate->relid,
     500             :                                          HASH_ENTER, &found);
     501             : 
     502         120 :                     if (!found ||
     503          24 :                         TimestampDifferenceExceeds(hentry->last_start_time, now,
     504             :                                                    wal_retrieve_retry_interval))
     505             :                     {
     506         246 :                         logicalrep_worker_launch(MyLogicalRepWorker->dbid,
     507          82 :                                                  MySubscription->oid,
     508          82 :                                                  MySubscription->name,
     509          82 :                                                  MyLogicalRepWorker->userid,
     510             :                                                  rstate->relid);
     511          82 :                         hentry->last_start_time = now;
     512             :                     }
     513             :                 }
     514             :             }
     515             :         }
     516             :     }
     517             : 
     518        6934 :     if (started_tx)
     519             :     {
     520         276 :         CommitTransactionCommand();
     521         276 :         pgstat_report_stat(false);
     522             :     }
     523        6934 : }
     524             : 
     525             : /*
     526             :  * Process possible state change(s) of tables that are being synchronized.
     527             :  */
     528             : void
     529        6934 : process_syncing_tables(XLogRecPtr current_lsn)
     530             : {
     531        6934 :     if (am_tablesync_worker())
     532           0 :         process_syncing_tables_for_sync(current_lsn);
     533             :     else
     534        6934 :         process_syncing_tables_for_apply(current_lsn);
     535        6934 : }
     536             : 
     537             : /*
     538             :  * Create list of columns for COPY based on logical relation mapping.
     539             :  */
     540             : static List *
     541          76 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
     542             : {
     543          76 :     List       *attnamelist = NIL;
     544             :     int         i;
     545             : 
     546         202 :     for (i = 0; i < rel->remoterel.natts; i++)
     547             :     {
     548         126 :         attnamelist = lappend(attnamelist,
     549         126 :                               makeString(rel->remoterel.attnames[i]));
     550             :     }
     551             : 
     552             : 
     553          76 :     return attnamelist;
     554             : }
     555             : 
     556             : /*
     557             :  * Data source callback for the COPY FROM, which reads from the remote
     558             :  * connection and passes the data back to our local COPY.
     559             :  */
     560             : static int
     561        2318 : copy_read_data(void *outbuf, int minread, int maxread)
     562             : {
     563        2318 :     int         bytesread = 0;
     564             :     int         avail;
     565             : 
     566             :     /* If there are some leftover data from previous read, use it. */
     567        2318 :     avail = copybuf->len - copybuf->cursor;
     568        2318 :     if (avail)
     569             :     {
     570           0 :         if (avail > maxread)
     571           0 :             avail = maxread;
     572           0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     573           0 :         copybuf->cursor += avail;
     574           0 :         maxread -= avail;
     575           0 :         bytesread += avail;
     576             :     }
     577             : 
     578        4638 :     while (maxread > 0 && bytesread < minread)
     579             :     {
     580        2320 :         pgsocket    fd = PGINVALID_SOCKET;
     581             :         int         len;
     582        2320 :         char       *buf = NULL;
     583             : 
     584             :         for (;;)
     585             :         {
     586             :             /* Try read the data. */
     587        2320 :             len = walrcv_receive(wrconn, &buf, &fd);
     588             : 
     589        2320 :             CHECK_FOR_INTERRUPTS();
     590             : 
     591        2320 :             if (len == 0)
     592           2 :                 break;
     593        2318 :             else if (len < 0)
     594        2394 :                 return bytesread;
     595             :             else
     596             :             {
     597             :                 /* Process the data */
     598        2242 :                 copybuf->data = buf;
     599        2242 :                 copybuf->len = len;
     600        2242 :                 copybuf->cursor = 0;
     601             : 
     602        2242 :                 avail = copybuf->len - copybuf->cursor;
     603        2242 :                 if (avail > maxread)
     604           0 :                     avail = maxread;
     605        2242 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     606        2242 :                 outbuf = (void *) ((char *) outbuf + avail);
     607        2242 :                 copybuf->cursor += avail;
     608        2242 :                 maxread -= avail;
     609        2242 :                 bytesread += avail;
     610             :             }
     611             : 
     612        2242 :             if (maxread <= 0 || bytesread >= minread)
     613        2242 :                 return bytesread;
     614             :         }
     615             : 
     616             :         /*
     617             :          * Wait for more data or latch.
     618             :          */
     619           2 :         (void) WaitLatchOrSocket(MyLatch,
     620             :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
     621             :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     622             :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
     623             : 
     624           2 :         ResetLatch(MyLatch);
     625             :     }
     626             : 
     627           0 :     return bytesread;
     628             : }
     629             : 
     630             : 
     631             : /*
     632             :  * Get information about remote relation in similar fashion the RELATION
     633             :  * message provides during replication.
     634             :  */
     635             : static void
     636          76 : fetch_remote_table_info(char *nspname, char *relname,
     637             :                         LogicalRepRelation *lrel)
     638             : {
     639             :     WalRcvExecResult *res;
     640             :     StringInfoData cmd;
     641             :     TupleTableSlot *slot;
     642          76 :     Oid         tableRow[2] = {OIDOID, CHAROID};
     643          76 :     Oid         attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
     644             :     bool        isnull;
     645             :     int         natt;
     646             : 
     647          76 :     lrel->nspname = nspname;
     648          76 :     lrel->relname = relname;
     649             : 
     650             :     /* First fetch Oid and replica identity. */
     651          76 :     initStringInfo(&cmd);
     652          76 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
     653             :                      "  FROM pg_catalog.pg_class c"
     654             :                      "  INNER JOIN pg_catalog.pg_namespace n"
     655             :                      "        ON (c.relnamespace = n.oid)"
     656             :                      " WHERE n.nspname = %s"
     657             :                      "   AND c.relname = %s"
     658             :                      "   AND c.relkind = 'r'",
     659             :                      quote_literal_cstr(nspname),
     660             :                      quote_literal_cstr(relname));
     661          76 :     res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
     662             : 
     663          76 :     if (res->status != WALRCV_OK_TUPLES)
     664           0 :         ereport(ERROR,
     665             :                 (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     666             :                         nspname, relname, res->err)));
     667             : 
     668          76 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     669          76 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     670           0 :         ereport(ERROR,
     671             :                 (errmsg("table \"%s.%s\" not found on publisher",
     672             :                         nspname, relname)));
     673             : 
     674          76 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
     675             :     Assert(!isnull);
     676          76 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
     677             :     Assert(!isnull);
     678             : 
     679          76 :     ExecDropSingleTupleTableSlot(slot);
     680          76 :     walrcv_clear_result(res);
     681             : 
     682             :     /* Now fetch columns. */
     683          76 :     resetStringInfo(&cmd);
     684         152 :     appendStringInfo(&cmd,
     685             :                      "SELECT a.attname,"
     686             :                      "       a.atttypid,"
     687             :                      "       a.atttypmod,"
     688             :                      "       a.attnum = ANY(i.indkey)"
     689             :                      "  FROM pg_catalog.pg_attribute a"
     690             :                      "  LEFT JOIN pg_catalog.pg_index i"
     691             :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
     692             :                      " WHERE a.attnum > 0::pg_catalog.int2"
     693             :                      "   AND NOT a.attisdropped %s"
     694             :                      "   AND a.attrelid = %u"
     695             :                      " ORDER BY a.attnum",
     696             :                      lrel->remoteid,
     697          76 :                      (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
     698             :                      lrel->remoteid);
     699          76 :     res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
     700             : 
     701          76 :     if (res->status != WALRCV_OK_TUPLES)
     702           0 :         ereport(ERROR,
     703             :                 (errmsg("could not fetch table info for table \"%s.%s\": %s",
     704             :                         nspname, relname, res->err)));
     705             : 
     706             :     /* We don't know the number of rows coming, so allocate enough space. */
     707          76 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
     708          76 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
     709          76 :     lrel->attkeys = NULL;
     710             : 
     711          76 :     natt = 0;
     712          76 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     713         278 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     714             :     {
     715         252 :         lrel->attnames[natt] =
     716         126 :             TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     717             :         Assert(!isnull);
     718         126 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
     719             :         Assert(!isnull);
     720         126 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
     721          60 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
     722             : 
     723             :         /* Should never happen. */
     724         126 :         if (++natt >= MaxTupleAttributeNumber)
     725           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
     726             :                  nspname, relname);
     727             : 
     728         126 :         ExecClearTuple(slot);
     729             :     }
     730          76 :     ExecDropSingleTupleTableSlot(slot);
     731             : 
     732          76 :     lrel->natts = natt;
     733             : 
     734          76 :     walrcv_clear_result(res);
     735          76 :     pfree(cmd.data);
     736          76 : }
     737             : 
     738             : /*
     739             :  * Copy existing data of a table from publisher.
     740             :  *
     741             :  * Caller is responsible for locking the local relation.
     742             :  */
     743             : static void
     744          76 : copy_table(Relation rel)
     745             : {
     746             :     LogicalRepRelMapEntry *relmapentry;
     747             :     LogicalRepRelation lrel;
     748             :     WalRcvExecResult *res;
     749             :     StringInfoData cmd;
     750             :     CopyState   cstate;
     751             :     List       *attnamelist;
     752             :     ParseState *pstate;
     753             : 
     754             :     /* Get the publisher relation info. */
     755          76 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
     756          76 :                             RelationGetRelationName(rel), &lrel);
     757             : 
     758             :     /* Put the relation into relmap. */
     759          76 :     logicalrep_relmap_update(&lrel);
     760             : 
     761             :     /* Map the publisher relation to local one. */
     762          76 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
     763             :     Assert(rel == relmapentry->localrel);
     764             : 
     765             :     /* Start copy on the publisher. */
     766          76 :     initStringInfo(&cmd);
     767          76 :     appendStringInfo(&cmd, "COPY %s TO STDOUT",
     768          76 :                      quote_qualified_identifier(lrel.nspname, lrel.relname));
     769          76 :     res = walrcv_exec(wrconn, cmd.data, 0, NULL);
     770          76 :     pfree(cmd.data);
     771          76 :     if (res->status != WALRCV_OK_COPY_OUT)
     772           0 :         ereport(ERROR,
     773             :                 (errmsg("could not start initial contents copy for table \"%s.%s\": %s",
     774             :                         lrel.nspname, lrel.relname, res->err)));
     775          76 :     walrcv_clear_result(res);
     776             : 
     777          76 :     copybuf = makeStringInfo();
     778             : 
     779          76 :     pstate = make_parsestate(NULL);
     780          76 :     addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
     781             :                                   NULL, false, false);
     782             : 
     783          76 :     attnamelist = make_copy_attnamelist(relmapentry);
     784          76 :     cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
     785             : 
     786             :     /* Do the copy */
     787          76 :     (void) CopyFrom(cstate);
     788             : 
     789          72 :     logicalrep_rel_close(relmapentry, NoLock);
     790          72 : }
     791             : 
     792             : /*
     793             :  * Start syncing the table in the sync worker.
     794             :  *
     795             :  * The returned slot name is palloc'ed in current memory context.
     796             :  */
     797             : char *
     798          76 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
     799             : {
     800             :     char       *slotname;
     801             :     char       *err;
     802             :     char        relstate;
     803             :     XLogRecPtr  relstate_lsn;
     804             : 
     805             :     /* Check the state of the table synchronization. */
     806          76 :     StartTransactionCommand();
     807          76 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     808          76 :                                        MyLogicalRepWorker->relid,
     809             :                                        &relstate_lsn, true);
     810          76 :     CommitTransactionCommand();
     811             : 
     812          76 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     813          76 :     MyLogicalRepWorker->relstate = relstate;
     814          76 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
     815          76 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
     816             : 
     817             :     /*
     818             :      * To build a slot name for the sync work, we are limited to NAMEDATALEN -
     819             :      * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
     820             :      * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
     821             :      * NAMEDATALEN on the remote that matters, but this scheme will also work
     822             :      * reasonably if that is different.)
     823             :      */
     824             :     StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");  /* for sanity */
     825         228 :     slotname = psprintf("%.*s_%u_sync_%u",
     826             :                         NAMEDATALEN - 28,
     827          76 :                         MySubscription->slotname,
     828          76 :                         MySubscription->oid,
     829          76 :                         MyLogicalRepWorker->relid);
     830             : 
     831             :     /*
     832             :      * Here we use the slot name instead of the subscription name as the
     833             :      * application_name, so that it is different from the main apply worker,
     834             :      * so that synchronous replication can distinguish them.
     835             :      */
     836          76 :     wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
     837          76 :     if (wrconn == NULL)
     838           0 :         ereport(ERROR,
     839             :                 (errmsg("could not connect to the publisher: %s", err)));
     840             : 
     841          76 :     switch (MyLogicalRepWorker->relstate)
     842             :     {
     843             :         case SUBREL_STATE_INIT:
     844             :         case SUBREL_STATE_DATASYNC:
     845             :             {
     846             :                 Relation    rel;
     847             :                 WalRcvExecResult *res;
     848             : 
     849          76 :                 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     850          76 :                 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
     851          76 :                 MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
     852          76 :                 SpinLockRelease(&MyLogicalRepWorker->relmutex);
     853             : 
     854             :                 /* Update the state and make it visible to others. */
     855          76 :                 StartTransactionCommand();
     856         228 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     857          76 :                                            MyLogicalRepWorker->relid,
     858          76 :                                            MyLogicalRepWorker->relstate,
     859          76 :                                            MyLogicalRepWorker->relstate_lsn);
     860          76 :                 CommitTransactionCommand();
     861          76 :                 pgstat_report_stat(false);
     862             : 
     863             :                 /*
     864             :                  * We want to do the table data sync in a single transaction.
     865             :                  */
     866          76 :                 StartTransactionCommand();
     867             : 
     868             :                 /*
     869             :                  * Use a standard write lock here. It might be better to
     870             :                  * disallow access to the table while it's being synchronized.
     871             :                  * But we don't want to block the main apply process from
     872             :                  * working and it has to open the relation in RowExclusiveLock
     873             :                  * when remapping remote relation id to local one.
     874             :                  */
     875          76 :                 rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
     876             : 
     877             :                 /*
     878             :                  * Create a temporary slot for the sync process. We do this
     879             :                  * inside the transaction so that we can use the snapshot made
     880             :                  * by the slot to get existing data.
     881             :                  */
     882          76 :                 res = walrcv_exec(wrconn,
     883             :                                   "BEGIN READ ONLY ISOLATION LEVEL "
     884             :                                   "REPEATABLE READ", 0, NULL);
     885          76 :                 if (res->status != WALRCV_OK_COMMAND)
     886           0 :                     ereport(ERROR,
     887             :                             (errmsg("table copy could not start transaction on publisher"),
     888             :                              errdetail("The error was: %s", res->err)));
     889          76 :                 walrcv_clear_result(res);
     890             : 
     891             :                 /*
     892             :                  * Create new temporary logical decoding slot.
     893             :                  *
     894             :                  * We'll use slot for data copy so make sure the snapshot is
     895             :                  * used for the transaction; that way the COPY will get data
     896             :                  * that is consistent with the lsn used by the slot to start
     897             :                  * decoding.
     898             :                  */
     899          76 :                 walrcv_create_slot(wrconn, slotname, true,
     900             :                                    CRS_USE_SNAPSHOT, origin_startpos);
     901             : 
     902          76 :                 PushActiveSnapshot(GetTransactionSnapshot());
     903          76 :                 copy_table(rel);
     904          72 :                 PopActiveSnapshot();
     905             : 
     906          72 :                 res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
     907          72 :                 if (res->status != WALRCV_OK_COMMAND)
     908           0 :                     ereport(ERROR,
     909             :                             (errmsg("table copy could not finish transaction on publisher"),
     910             :                              errdetail("The error was: %s", res->err)));
     911          72 :                 walrcv_clear_result(res);
     912             : 
     913          72 :                 table_close(rel, NoLock);
     914             : 
     915             :                 /* Make the copy visible. */
     916          72 :                 CommandCounterIncrement();
     917             : 
     918             :                 /*
     919             :                  * We are done with the initial data synchronization, update
     920             :                  * the state.
     921             :                  */
     922          72 :                 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     923          72 :                 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
     924          72 :                 MyLogicalRepWorker->relstate_lsn = *origin_startpos;
     925          72 :                 SpinLockRelease(&MyLogicalRepWorker->relmutex);
     926             : 
     927             :                 /* Wait for main apply worker to tell us to catchup. */
     928          72 :                 wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
     929             : 
     930             :                 /*----------
     931             :                  * There are now two possible states here:
     932             :                  * a) Sync is behind the apply.  If that's the case we need to
     933             :                  *    catch up with it by consuming the logical replication
     934             :                  *    stream up to the relstate_lsn.  For that, we exit this
     935             :                  *    function and continue in ApplyWorkerMain().
     936             :                  * b) Sync is caught up with the apply.  So it can just set
     937             :                  *    the state to SYNCDONE and finish.
     938             :                  *----------
     939             :                  */
     940          72 :                 if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
     941             :                 {
     942             :                     /*
     943             :                      * Update the new state in catalog.  No need to bother
     944             :                      * with the shmem state as we are exiting for good.
     945             :                      */
     946         144 :                     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     947          72 :                                                MyLogicalRepWorker->relid,
     948             :                                                SUBREL_STATE_SYNCDONE,
     949             :                                                *origin_startpos);
     950          72 :                     finish_sync_worker();
     951             :                 }
     952           0 :                 break;
     953             :             }
     954             :         case SUBREL_STATE_SYNCDONE:
     955             :         case SUBREL_STATE_READY:
     956             :         case SUBREL_STATE_UNKNOWN:
     957             : 
     958             :             /*
     959             :              * Nothing to do here but finish.  (UNKNOWN means the relation was
     960             :              * removed from pg_subscription_rel before the sync worker could
     961             :              * start.)
     962             :              */
     963           0 :             finish_sync_worker();
     964             :             break;
     965             :         default:
     966           0 :             elog(ERROR, "unknown relation state \"%c\"",
     967             :                  MyLogicalRepWorker->relstate);
     968             :     }
     969             : 
     970           0 :     return slotname;
     971             : }

Generated by: LCOV version 1.13