Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * tablesync.c
3 : * PostgreSQL logical replication: initial table data synchronization
4 : *
5 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/tablesync.c
9 : *
10 : * NOTES
11 : * This file contains code for initial table data synchronization for
12 : * logical replication.
13 : *
14 : * The initial data synchronization is done separately for each table,
15 : * in a separate apply worker that only fetches the initial snapshot data
16 : * from the publisher and then synchronizes the position in the stream with
17 : * the leader apply worker.
18 : *
19 : * There are several reasons for doing the synchronization this way:
20 : * - It allows us to parallelize the initial data synchronization
21 : * which lowers the time needed for it to happen.
22 : * - The initial synchronization does not have to hold the xid and LSN
23 : * for the time it takes to copy data of all tables, causing less
24 : * bloat and lower disk consumption compared to doing the
25 : * synchronization in a single process for the whole database.
26 : * - It allows us to synchronize any tables added after the initial
27 : * synchronization has finished.
28 : *
29 : * The stream position synchronization works in multiple steps:
30 : * - Apply worker requests a tablesync worker to start, setting the new
31 : * table state to INIT.
32 : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : * copying.
34 : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : * worker specific) state to indicate when the copy phase has completed, so
36 : * if the worker crashes with this (non-memory) state then the copy will not
37 : * be re-attempted.
38 : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : * until either the table state is set to SYNCDONE or the sync worker
42 : * exits.
43 : * - After the sync worker has seen the state change to CATCHUP, it will
44 : * read the stream and apply changes (acting like an apply worker) until
45 : * it catches up to the specified stream position. Then it sets the
46 : * state to SYNCDONE. There might be zero changes applied between
47 : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : * apply worker.
49 : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : * the table until it reaches the SYNCDONE stream position, at which
51 : * point it sets state to READY and stops tracking. Again, there might
52 : * be zero changes in between.
53 : *
54 : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : *
57 : * The catalog pg_subscription_rel is used to keep information about
58 : * subscribed tables and their state. The catalog holds all states
59 : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : *
61 : * Example flows look like this:
62 : * - Apply is in front:
63 : * sync:8
64 : * -> set in catalog FINISHEDCOPY
65 : * -> set in memory SYNCWAIT
66 : * apply:10
67 : * -> set in memory CATCHUP
68 : * -> enter wait-loop
69 : * sync:10
70 : * -> set in catalog SYNCDONE
71 : * -> exit
72 : * apply:10
73 : * -> exit wait-loop
74 : * -> continue rep
75 : * apply:11
76 : * -> set in catalog READY
77 : *
78 : * - Sync is in front:
79 : * sync:10
80 : * -> set in catalog FINISHEDCOPY
81 : * -> set in memory SYNCWAIT
82 : * apply:8
83 : * -> set in memory CATCHUP
84 : * -> continue per-table filtering
85 : * sync:10
86 : * -> set in catalog SYNCDONE
87 : * -> exit
88 : * apply:10
89 : * -> set in catalog READY
90 : * -> stop per-table filtering
91 : * -> continue rep
92 : *-------------------------------------------------------------------------
93 : */
94 :
95 : #include "postgres.h"
96 :
97 : #include "access/table.h"
98 : #include "access/xact.h"
99 : #include "catalog/indexing.h"
100 : #include "catalog/pg_subscription_rel.h"
101 : #include "catalog/pg_type.h"
102 : #include "commands/copy.h"
103 : #include "miscadmin.h"
104 : #include "nodes/makefuncs.h"
105 : #include "parser/parse_relation.h"
106 : #include "pgstat.h"
107 : #include "replication/logicallauncher.h"
108 : #include "replication/logicalrelation.h"
109 : #include "replication/logicalworker.h"
110 : #include "replication/origin.h"
111 : #include "replication/slot.h"
112 : #include "replication/walreceiver.h"
113 : #include "replication/worker_internal.h"
114 : #include "storage/ipc.h"
115 : #include "storage/lmgr.h"
116 : #include "utils/acl.h"
117 : #include "utils/array.h"
118 : #include "utils/builtins.h"
119 : #include "utils/lsyscache.h"
120 : #include "utils/rls.h"
121 : #include "utils/snapmgr.h"
122 : #include "utils/syscache.h"
123 : #include "utils/usercontext.h"
124 :
125 : List *table_states_not_ready = NIL;
126 :
127 : static StringInfo copybuf = NULL;
128 :
129 : /*
130 : * Wait until the relation sync state is set in the catalog to the expected
131 : * one; return true when it happens.
132 : *
133 : * Returns false if the table sync worker or the table itself have
134 : * disappeared, or the table state has been reset.
135 : *
136 : * Currently, this is used in the apply worker when transitioning from
137 : * CATCHUP state to SYNCDONE.
138 : */
139 : static bool
140 360 : wait_for_table_state_change(Oid relid, char expected_state)
141 : {
142 : char state;
143 :
144 : for (;;)
145 406 : {
146 : LogicalRepWorker *worker;
147 : XLogRecPtr statelsn;
148 :
149 766 : CHECK_FOR_INTERRUPTS();
150 :
151 766 : InvalidateCatalogSnapshot();
152 766 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
153 : relid, &statelsn);
154 :
155 766 : if (state == SUBREL_STATE_UNKNOWN)
156 0 : break;
157 :
158 766 : if (state == expected_state)
159 360 : return true;
160 :
161 : /* Check if the sync worker is still running and bail if not. */
162 406 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
163 406 : worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
164 406 : MyLogicalRepWorker->subid, relid,
165 : false);
166 406 : LWLockRelease(LogicalRepWorkerLock);
167 406 : if (!worker)
168 0 : break;
169 :
170 406 : (void) WaitLatch(MyLatch,
171 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
172 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
173 :
174 406 : ResetLatch(MyLatch);
175 : }
176 :
177 0 : return false;
178 : }
179 :
180 : /*
181 : * Wait until the apply worker changes the state of our synchronization
182 : * worker to the expected one.
183 : *
184 : * Used when transitioning from SYNCWAIT state to CATCHUP.
185 : *
186 : * Returns false if the apply worker has disappeared.
187 : */
188 : static bool
189 372 : wait_for_worker_state_change(char expected_state)
190 : {
191 : int rc;
192 :
193 : for (;;)
194 372 : {
195 : LogicalRepWorker *worker;
196 :
197 744 : CHECK_FOR_INTERRUPTS();
198 :
199 : /*
200 : * Done if already in correct state. (We assume this fetch is atomic
201 : * enough to not give a misleading answer if we do it with no lock.)
202 : */
203 744 : if (MyLogicalRepWorker->relstate == expected_state)
204 372 : return true;
205 :
206 : /*
207 : * Bail out if the apply worker has died, else signal it we're
208 : * waiting.
209 : */
210 372 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
211 372 : worker = logicalrep_worker_find(WORKERTYPE_APPLY,
212 372 : MyLogicalRepWorker->subid, InvalidOid,
213 : false);
214 372 : if (worker && worker->proc)
215 372 : logicalrep_worker_wakeup_ptr(worker);
216 372 : LWLockRelease(LogicalRepWorkerLock);
217 372 : if (!worker)
218 0 : break;
219 :
220 : /*
221 : * Wait. We expect to get a latch signal back from the apply worker,
222 : * but use a timeout in case it dies without sending one.
223 : */
224 372 : rc = WaitLatch(MyLatch,
225 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
226 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
227 :
228 372 : if (rc & WL_LATCH_SET)
229 372 : ResetLatch(MyLatch);
230 : }
231 :
232 0 : return false;
233 : }
234 :
235 : /*
236 : * Handle table synchronization cooperation from the synchronization
237 : * worker.
238 : *
239 : * If the sync worker is in CATCHUP state and reached (or passed) the
240 : * predetermined synchronization point in the WAL stream, mark the table as
241 : * SYNCDONE and finish.
242 : */
243 : void
244 454 : ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
245 : {
246 454 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
247 :
248 454 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
249 454 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
250 : {
251 : TimeLineID tli;
252 372 : char syncslotname[NAMEDATALEN] = {0};
253 372 : char originname[NAMEDATALEN] = {0};
254 :
255 372 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
256 372 : MyLogicalRepWorker->relstate_lsn = current_lsn;
257 :
258 372 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
259 :
260 : /*
261 : * UpdateSubscriptionRelState must be called within a transaction.
262 : */
263 372 : if (!IsTransactionState())
264 372 : StartTransactionCommand();
265 :
266 372 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
267 372 : MyLogicalRepWorker->relid,
268 372 : MyLogicalRepWorker->relstate,
269 372 : MyLogicalRepWorker->relstate_lsn,
270 : false);
271 :
272 : /*
273 : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
274 : * the slot.
275 : */
276 372 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
277 :
278 : /*
279 : * Cleanup the tablesync slot.
280 : *
281 : * This has to be done after updating the state because otherwise if
282 : * there is an error while doing the database operations we won't be
283 : * able to rollback dropped slot.
284 : */
285 372 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
286 372 : MyLogicalRepWorker->relid,
287 : syncslotname,
288 : sizeof(syncslotname));
289 :
290 : /*
291 : * It is important to give an error if we are unable to drop the slot,
292 : * otherwise, it won't be dropped till the corresponding subscription
293 : * is dropped. So passing missing_ok = false.
294 : */
295 372 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
296 :
297 372 : CommitTransactionCommand();
298 372 : pgstat_report_stat(false);
299 :
300 : /*
301 : * Start a new transaction to clean up the tablesync origin tracking.
302 : * This transaction will be ended within the FinishSyncWorker(). Now,
303 : * even, if we fail to remove this here, the apply worker will ensure
304 : * to clean it up afterward.
305 : *
306 : * We need to do this after the table state is set to SYNCDONE.
307 : * Otherwise, if an error occurs while performing the database
308 : * operation, the worker will be restarted and the in-memory state of
309 : * replication progress (remote_lsn) won't be rolled-back which would
310 : * have been cleared before restart. So, the restarted worker will use
311 : * invalid replication progress state resulting in replay of
312 : * transactions that have already been applied.
313 : */
314 372 : StartTransactionCommand();
315 :
316 372 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
317 372 : MyLogicalRepWorker->relid,
318 : originname,
319 : sizeof(originname));
320 :
321 : /*
322 : * Resetting the origin session removes the ownership of the slot.
323 : * This is needed to allow the origin to be dropped.
324 : */
325 372 : replorigin_session_reset();
326 372 : replorigin_xact_clear(true);
327 :
328 : /*
329 : * Drop the tablesync's origin tracking if exists.
330 : *
331 : * There is a chance that the user is concurrently performing refresh
332 : * for the subscription where we remove the table state and its origin
333 : * or the apply worker would have removed this origin. So passing
334 : * missing_ok = true.
335 : */
336 372 : replorigin_drop_by_name(originname, true, false);
337 :
338 372 : FinishSyncWorker();
339 : }
340 : else
341 82 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
342 82 : }
343 :
344 : /*
345 : * Handle table synchronization cooperation from the apply worker.
346 : *
347 : * Walk over all subscription tables that are individually tracked by the
348 : * apply process (currently, all that have state other than
349 : * SUBREL_STATE_READY) and manage synchronization for them.
350 : *
351 : * If there are tables that need synchronizing and are not being synchronized
352 : * yet, start sync workers for them (if there are free slots for sync
353 : * workers). To prevent starting the sync worker for the same relation at a
354 : * high frequency after a failure, we store its last start time with each sync
355 : * state info. We start the sync worker for the same relation after waiting
356 : * at least wal_retrieve_retry_interval.
357 : *
358 : * For tables that are being synchronized already, check if sync workers
359 : * either need action from the apply worker or have finished. This is the
360 : * SYNCWAIT to CATCHUP transition.
361 : *
362 : * If the synchronization position is reached (SYNCDONE), then the table can
363 : * be marked as READY and is no longer tracked.
364 : */
365 : void
366 12020 : ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
367 : {
368 : struct tablesync_start_time_mapping
369 : {
370 : Oid relid;
371 : TimestampTz last_start_time;
372 : };
373 : static HTAB *last_start_times = NULL;
374 : ListCell *lc;
375 : bool started_tx;
376 12020 : bool should_exit = false;
377 12020 : Relation rel = NULL;
378 :
379 : Assert(!IsTransactionState());
380 :
381 : /* We need up-to-date sync state info for subscription tables here. */
382 12020 : FetchRelationStates(NULL, NULL, &started_tx);
383 :
384 : /*
385 : * Prepare a hash table for tracking last start times of workers, to avoid
386 : * immediate restarts. We don't need it if there are no tables that need
387 : * syncing.
388 : */
389 12020 : if (table_states_not_ready != NIL && !last_start_times)
390 240 : {
391 : HASHCTL ctl;
392 :
393 240 : ctl.keysize = sizeof(Oid);
394 240 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
395 240 : last_start_times = hash_create("Logical replication table sync worker start times",
396 : 256, &ctl, HASH_ELEM | HASH_BLOBS);
397 : }
398 :
399 : /*
400 : * Clean up the hash table when we're done with all tables (just to
401 : * release the bit of memory).
402 : */
403 11780 : else if (table_states_not_ready == NIL && last_start_times)
404 : {
405 178 : hash_destroy(last_start_times);
406 178 : last_start_times = NULL;
407 : }
408 :
409 : /*
410 : * Process all tables that are being synchronized.
411 : */
412 15508 : foreach(lc, table_states_not_ready)
413 : {
414 3490 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
415 :
416 3490 : if (!started_tx)
417 : {
418 596 : StartTransactionCommand();
419 596 : started_tx = true;
420 : }
421 :
422 : Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
423 :
424 3490 : if (rstate->state == SUBREL_STATE_SYNCDONE)
425 : {
426 : /*
427 : * Apply has caught up to the position where the table sync has
428 : * finished. Mark the table as ready so that the apply will just
429 : * continue to replicate it normally.
430 : */
431 358 : if (current_lsn >= rstate->lsn)
432 : {
433 : char originname[NAMEDATALEN];
434 :
435 354 : rstate->state = SUBREL_STATE_READY;
436 354 : rstate->lsn = current_lsn;
437 :
438 : /*
439 : * Remove the tablesync origin tracking if exists.
440 : *
441 : * There is a chance that the user is concurrently performing
442 : * refresh for the subscription where we remove the table
443 : * state and its origin or the tablesync worker would have
444 : * already removed this origin. We can't rely on tablesync
445 : * worker to remove the origin tracking as if there is any
446 : * error while dropping we won't restart it to drop the
447 : * origin. So passing missing_ok = true.
448 : *
449 : * Lock the subscription and origin in the same order as we
450 : * are doing during DDL commands to avoid deadlocks. See
451 : * AlterSubscription_refresh.
452 : */
453 354 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
454 : 0, AccessShareLock);
455 :
456 354 : if (!rel)
457 354 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
458 :
459 354 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
460 : rstate->relid,
461 : originname,
462 : sizeof(originname));
463 354 : replorigin_drop_by_name(originname, true, false);
464 :
465 : /*
466 : * Update the state to READY only after the origin cleanup.
467 : */
468 354 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
469 354 : rstate->relid, rstate->state,
470 : rstate->lsn, true);
471 : }
472 : }
473 : else
474 : {
475 : LogicalRepWorker *syncworker;
476 :
477 : /*
478 : * Look for a sync worker for this relation.
479 : */
480 3132 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
481 :
482 3132 : syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
483 3132 : MyLogicalRepWorker->subid,
484 : rstate->relid, false);
485 :
486 3132 : if (syncworker)
487 : {
488 : /* Found one, update our copy of its state */
489 1390 : SpinLockAcquire(&syncworker->relmutex);
490 1390 : rstate->state = syncworker->relstate;
491 1390 : rstate->lsn = syncworker->relstate_lsn;
492 1390 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
493 : {
494 : /*
495 : * Sync worker is waiting for apply. Tell sync worker it
496 : * can catchup now.
497 : */
498 360 : syncworker->relstate = SUBREL_STATE_CATCHUP;
499 360 : syncworker->relstate_lsn =
500 360 : Max(syncworker->relstate_lsn, current_lsn);
501 : }
502 1390 : SpinLockRelease(&syncworker->relmutex);
503 :
504 : /* If we told worker to catch up, wait for it. */
505 1390 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
506 : {
507 : /* Signal the sync worker, as it may be waiting for us. */
508 360 : if (syncworker->proc)
509 360 : logicalrep_worker_wakeup_ptr(syncworker);
510 :
511 : /* Now safe to release the LWLock */
512 360 : LWLockRelease(LogicalRepWorkerLock);
513 :
514 360 : if (started_tx)
515 : {
516 : /*
517 : * We must commit the existing transaction to release
518 : * the existing locks before entering a busy loop.
519 : * This is required to avoid any undetected deadlocks
520 : * due to any existing lock as deadlock detector won't
521 : * be able to detect the waits on the latch.
522 : *
523 : * Also close any tables prior to the commit.
524 : */
525 360 : if (rel)
526 : {
527 54 : table_close(rel, NoLock);
528 54 : rel = NULL;
529 : }
530 360 : CommitTransactionCommand();
531 360 : pgstat_report_stat(false);
532 : }
533 :
534 : /*
535 : * Enter busy loop and wait for synchronization worker to
536 : * reach expected state (or die trying).
537 : */
538 360 : StartTransactionCommand();
539 360 : started_tx = true;
540 :
541 360 : wait_for_table_state_change(rstate->relid,
542 : SUBREL_STATE_SYNCDONE);
543 : }
544 : else
545 1030 : LWLockRelease(LogicalRepWorkerLock);
546 : }
547 : else
548 : {
549 : /*
550 : * If there is no sync worker for this table yet, count
551 : * running sync workers for this subscription, while we have
552 : * the lock.
553 : */
554 : int nsyncworkers =
555 1742 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
556 : struct tablesync_start_time_mapping *hentry;
557 : bool found;
558 :
559 : /* Now safe to release the LWLock */
560 1742 : LWLockRelease(LogicalRepWorkerLock);
561 :
562 1742 : hentry = hash_search(last_start_times, &rstate->relid,
563 : HASH_ENTER, &found);
564 1742 : if (!found)
565 376 : hentry->last_start_time = 0;
566 :
567 1742 : launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
568 : rstate->relid, &hentry->last_start_time);
569 : }
570 : }
571 : }
572 :
573 : /* Close table if opened */
574 12018 : if (rel)
575 300 : table_close(rel, NoLock);
576 :
577 :
578 12018 : if (started_tx)
579 : {
580 : /*
581 : * Even when the two_phase mode is requested by the user, it remains
582 : * as 'pending' until all tablesyncs have reached READY state.
583 : *
584 : * When this happens, we restart the apply worker and (if the
585 : * conditions are still ok) then the two_phase tri-state will become
586 : * 'enabled' at that time.
587 : *
588 : * Note: If the subscription has no tables then leave the state as
589 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
590 : * work.
591 : */
592 2084 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
593 : {
594 76 : CommandCounterIncrement(); /* make updates visible */
595 76 : if (AllTablesyncsReady())
596 : {
597 12 : ereport(LOG,
598 : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
599 : MySubscription->name)));
600 12 : should_exit = true;
601 : }
602 : }
603 :
604 2084 : CommitTransactionCommand();
605 2084 : pgstat_report_stat(true);
606 : }
607 :
608 12018 : if (should_exit)
609 : {
610 : /*
611 : * Reset the last-start time for this worker so that the launcher will
612 : * restart it without waiting for wal_retrieve_retry_interval.
613 : */
614 12 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
615 :
616 12 : proc_exit(0);
617 : }
618 12006 : }
619 :
620 : /*
621 : * Create list of columns for COPY based on logical relation mapping.
622 : */
623 : static List *
624 394 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
625 : {
626 394 : List *attnamelist = NIL;
627 : int i;
628 :
629 1052 : for (i = 0; i < rel->remoterel.natts; i++)
630 : {
631 658 : attnamelist = lappend(attnamelist,
632 658 : makeString(rel->remoterel.attnames[i]));
633 : }
634 :
635 :
636 394 : return attnamelist;
637 : }
638 :
639 : /*
640 : * Data source callback for the COPY FROM, which reads from the remote
641 : * connection and passes the data back to our local COPY.
642 : */
643 : static int
644 28008 : copy_read_data(void *outbuf, int minread, int maxread)
645 : {
646 28008 : int bytesread = 0;
647 : int avail;
648 :
649 : /* If there are some leftover data from previous read, use it. */
650 28008 : avail = copybuf->len - copybuf->cursor;
651 28008 : if (avail)
652 : {
653 0 : if (avail > maxread)
654 0 : avail = maxread;
655 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
656 0 : copybuf->cursor += avail;
657 0 : maxread -= avail;
658 0 : bytesread += avail;
659 : }
660 :
661 28038 : while (maxread > 0 && bytesread < minread)
662 : {
663 28038 : pgsocket fd = PGINVALID_SOCKET;
664 : int len;
665 28038 : char *buf = NULL;
666 :
667 : for (;;)
668 : {
669 : /* Try read the data. */
670 28038 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
671 :
672 28038 : CHECK_FOR_INTERRUPTS();
673 :
674 28038 : if (len == 0)
675 30 : break;
676 28008 : else if (len < 0)
677 28008 : return bytesread;
678 : else
679 : {
680 : /* Process the data */
681 27618 : copybuf->data = buf;
682 27618 : copybuf->len = len;
683 27618 : copybuf->cursor = 0;
684 :
685 27618 : avail = copybuf->len - copybuf->cursor;
686 27618 : if (avail > maxread)
687 0 : avail = maxread;
688 27618 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
689 27618 : outbuf = (char *) outbuf + avail;
690 27618 : copybuf->cursor += avail;
691 27618 : maxread -= avail;
692 27618 : bytesread += avail;
693 : }
694 :
695 27618 : if (maxread <= 0 || bytesread >= minread)
696 27618 : return bytesread;
697 : }
698 :
699 : /*
700 : * Wait for more data or latch.
701 : */
702 30 : (void) WaitLatchOrSocket(MyLatch,
703 : WL_SOCKET_READABLE | WL_LATCH_SET |
704 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
705 : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
706 :
707 30 : ResetLatch(MyLatch);
708 : }
709 :
710 0 : return bytesread;
711 : }
712 :
713 :
714 : /*
715 : * Get information about remote relation in similar fashion the RELATION
716 : * message provides during replication.
717 : *
718 : * This function also returns (a) the relation qualifications to be used in
719 : * the COPY command, and (b) whether the remote relation has published any
720 : * generated column.
721 : */
722 : static void
723 398 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
724 : List **qual, bool *gencol_published)
725 : {
726 : WalRcvExecResult *res;
727 : StringInfoData cmd;
728 : TupleTableSlot *slot;
729 398 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
730 398 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
731 398 : Oid qualRow[] = {TEXTOID};
732 : bool isnull;
733 : int natt;
734 398 : StringInfo pub_names = NULL;
735 398 : Bitmapset *included_cols = NULL;
736 398 : int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
737 :
738 398 : lrel->nspname = nspname;
739 398 : lrel->relname = relname;
740 :
741 : /* First fetch Oid and replica identity. */
742 398 : initStringInfo(&cmd);
743 398 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
744 : " FROM pg_catalog.pg_class c"
745 : " INNER JOIN pg_catalog.pg_namespace n"
746 : " ON (c.relnamespace = n.oid)"
747 : " WHERE n.nspname = %s"
748 : " AND c.relname = %s",
749 : quote_literal_cstr(nspname),
750 : quote_literal_cstr(relname));
751 398 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
752 : lengthof(tableRow), tableRow);
753 :
754 398 : if (res->status != WALRCV_OK_TUPLES)
755 0 : ereport(ERROR,
756 : (errcode(ERRCODE_CONNECTION_FAILURE),
757 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
758 : nspname, relname, res->err)));
759 :
760 398 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
761 398 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
762 0 : ereport(ERROR,
763 : (errcode(ERRCODE_UNDEFINED_OBJECT),
764 : errmsg("table \"%s.%s\" not found on publisher",
765 : nspname, relname)));
766 :
767 398 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
768 : Assert(!isnull);
769 398 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
770 : Assert(!isnull);
771 398 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
772 : Assert(!isnull);
773 :
774 398 : ExecDropSingleTupleTableSlot(slot);
775 398 : walrcv_clear_result(res);
776 :
777 :
778 : /*
779 : * Get column lists for each relation.
780 : *
781 : * We need to do this before fetching info about column names and types,
782 : * so that we can skip columns that should not be replicated.
783 : */
784 398 : if (server_version >= 150000)
785 : {
786 : WalRcvExecResult *pubres;
787 : TupleTableSlot *tslot;
788 398 : Oid attrsRow[] = {INT2VECTOROID};
789 :
790 : /* Build the pub_names comma-separated string. */
791 398 : pub_names = makeStringInfo();
792 398 : GetPublicationsStr(MySubscription->publications, pub_names, true);
793 :
794 : /*
795 : * Fetch info about column lists for the relation (from all the
796 : * publications).
797 : */
798 398 : resetStringInfo(&cmd);
799 398 : appendStringInfo(&cmd,
800 : "SELECT DISTINCT"
801 : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
802 : " THEN NULL ELSE gpt.attrs END)"
803 : " FROM pg_publication p,"
804 : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
805 : " pg_class c"
806 : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
807 : " AND p.pubname IN ( %s )",
808 : lrel->remoteid,
809 : pub_names->data);
810 :
811 398 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
812 : lengthof(attrsRow), attrsRow);
813 :
814 398 : if (pubres->status != WALRCV_OK_TUPLES)
815 0 : ereport(ERROR,
816 : (errcode(ERRCODE_CONNECTION_FAILURE),
817 : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
818 : nspname, relname, pubres->err)));
819 :
820 : /*
821 : * We don't support the case where the column list is different for
822 : * the same table when combining publications. See comments atop
823 : * fetch_relation_list. So there should be only one row returned.
824 : * Although we already checked this when creating the subscription, we
825 : * still need to check here in case the column list was changed after
826 : * creating the subscription and before the sync worker is started.
827 : */
828 398 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
829 0 : ereport(ERROR,
830 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
831 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
832 : nspname, relname));
833 :
834 : /*
835 : * Get the column list and build a single bitmap with the attnums.
836 : *
837 : * If we find a NULL value, it means all the columns should be
838 : * replicated.
839 : */
840 398 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
841 398 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
842 : {
843 398 : Datum cfval = slot_getattr(tslot, 1, &isnull);
844 :
845 398 : if (!isnull)
846 : {
847 : ArrayType *arr;
848 : int nelems;
849 : int16 *elems;
850 :
851 44 : arr = DatumGetArrayTypeP(cfval);
852 44 : nelems = ARR_DIMS(arr)[0];
853 44 : elems = (int16 *) ARR_DATA_PTR(arr);
854 :
855 118 : for (natt = 0; natt < nelems; natt++)
856 74 : included_cols = bms_add_member(included_cols, elems[natt]);
857 : }
858 :
859 398 : ExecClearTuple(tslot);
860 : }
861 398 : ExecDropSingleTupleTableSlot(tslot);
862 :
863 398 : walrcv_clear_result(pubres);
864 : }
865 :
866 : /*
867 : * Now fetch column names and types.
868 : */
869 398 : resetStringInfo(&cmd);
870 398 : appendStringInfoString(&cmd,
871 : "SELECT a.attnum,"
872 : " a.attname,"
873 : " a.atttypid,"
874 : " a.attnum = ANY(i.indkey)");
875 :
876 : /* Generated columns can be replicated since version 18. */
877 398 : if (server_version >= 180000)
878 398 : appendStringInfoString(&cmd, ", a.attgenerated != ''");
879 :
880 796 : appendStringInfo(&cmd,
881 : " FROM pg_catalog.pg_attribute a"
882 : " LEFT JOIN pg_catalog.pg_index i"
883 : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
884 : " WHERE a.attnum > 0::pg_catalog.int2"
885 : " AND NOT a.attisdropped %s"
886 : " AND a.attrelid = %u"
887 : " ORDER BY a.attnum",
888 : lrel->remoteid,
889 398 : (server_version >= 120000 && server_version < 180000 ?
890 : "AND a.attgenerated = ''" : ""),
891 : lrel->remoteid);
892 398 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
893 : server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
894 :
895 398 : if (res->status != WALRCV_OK_TUPLES)
896 0 : ereport(ERROR,
897 : (errcode(ERRCODE_CONNECTION_FAILURE),
898 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
899 : nspname, relname, res->err)));
900 :
901 : /* We don't know the number of rows coming, so allocate enough space. */
902 398 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
903 398 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
904 398 : lrel->attkeys = NULL;
905 :
906 : /*
907 : * Store the columns as a list of names. Ignore those that are not
908 : * present in the column list, if there is one.
909 : */
910 398 : natt = 0;
911 398 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
912 1130 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
913 : {
914 : char *rel_colname;
915 : AttrNumber attnum;
916 :
917 732 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
918 : Assert(!isnull);
919 :
920 : /* If the column is not in the column list, skip it. */
921 732 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
922 : {
923 62 : ExecClearTuple(slot);
924 62 : continue;
925 : }
926 :
927 670 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
928 : Assert(!isnull);
929 :
930 670 : lrel->attnames[natt] = rel_colname;
931 670 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
932 : Assert(!isnull);
933 :
934 670 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
935 220 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
936 :
937 : /* Remember if the remote table has published any generated column. */
938 670 : if (server_version >= 180000 && !(*gencol_published))
939 : {
940 670 : *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
941 : Assert(!isnull);
942 : }
943 :
944 : /* Should never happen. */
945 670 : if (++natt >= MaxTupleAttributeNumber)
946 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
947 : nspname, relname);
948 :
949 670 : ExecClearTuple(slot);
950 : }
951 398 : ExecDropSingleTupleTableSlot(slot);
952 :
953 398 : lrel->natts = natt;
954 :
955 398 : walrcv_clear_result(res);
956 :
957 : /*
958 : * Get relation's row filter expressions. DISTINCT avoids the same
959 : * expression of a table in multiple publications from being included
960 : * multiple times in the final expression.
961 : *
962 : * We need to copy the row even if it matches just one of the
963 : * publications, so we later combine all the quals with OR.
964 : *
965 : * For initial synchronization, row filtering can be ignored in following
966 : * cases:
967 : *
968 : * 1) one of the subscribed publications for the table hasn't specified
969 : * any row filter
970 : *
971 : * 2) one of the subscribed publications has puballtables set to true
972 : *
973 : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
974 : * that includes this relation
975 : */
976 398 : if (server_version >= 150000)
977 : {
978 : /* Reuse the already-built pub_names. */
979 : Assert(pub_names != NULL);
980 :
981 : /* Check for row filters. */
982 398 : resetStringInfo(&cmd);
983 398 : appendStringInfo(&cmd,
984 : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
985 : " FROM pg_publication p,"
986 : " LATERAL pg_get_publication_tables(p.pubname) gpt"
987 : " WHERE gpt.relid = %u"
988 : " AND p.pubname IN ( %s )",
989 : lrel->remoteid,
990 : pub_names->data);
991 :
992 398 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
993 :
994 398 : if (res->status != WALRCV_OK_TUPLES)
995 0 : ereport(ERROR,
996 : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
997 : nspname, relname, res->err)));
998 :
999 : /*
1000 : * Multiple row filter expressions for the same table will be combined
1001 : * by COPY using OR. If any of the filter expressions for this table
1002 : * are null, it means the whole table will be copied. In this case it
1003 : * is not necessary to construct a unified row filter expression at
1004 : * all.
1005 : */
1006 398 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1007 428 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1008 : {
1009 406 : Datum rf = slot_getattr(slot, 1, &isnull);
1010 :
1011 406 : if (!isnull)
1012 30 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1013 : else
1014 : {
1015 : /* Ignore filters and cleanup as necessary. */
1016 376 : if (*qual)
1017 : {
1018 6 : list_free_deep(*qual);
1019 6 : *qual = NIL;
1020 : }
1021 376 : break;
1022 : }
1023 :
1024 30 : ExecClearTuple(slot);
1025 : }
1026 398 : ExecDropSingleTupleTableSlot(slot);
1027 :
1028 398 : walrcv_clear_result(res);
1029 398 : destroyStringInfo(pub_names);
1030 : }
1031 :
1032 398 : pfree(cmd.data);
1033 398 : }
1034 :
1035 : /*
1036 : * Copy existing data of a table from publisher.
1037 : *
1038 : * Caller is responsible for locking the local relation.
1039 : */
1040 : static void
1041 398 : copy_table(Relation rel)
1042 : {
1043 : LogicalRepRelMapEntry *relmapentry;
1044 : LogicalRepRelation lrel;
1045 398 : List *qual = NIL;
1046 : WalRcvExecResult *res;
1047 : StringInfoData cmd;
1048 : CopyFromState cstate;
1049 : List *attnamelist;
1050 : ParseState *pstate;
1051 398 : List *options = NIL;
1052 398 : bool gencol_published = false;
1053 :
1054 : /* Get the publisher relation info. */
1055 398 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
1056 398 : RelationGetRelationName(rel), &lrel, &qual,
1057 : &gencol_published);
1058 :
1059 : /* Put the relation into relmap. */
1060 398 : logicalrep_relmap_update(&lrel);
1061 :
1062 : /* Map the publisher relation to local one. */
1063 398 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1064 : Assert(rel == relmapentry->localrel);
1065 :
1066 : /* Start copy on the publisher. */
1067 394 : initStringInfo(&cmd);
1068 :
1069 : /* Regular or partitioned table with no row filter or generated columns */
1070 394 : if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
1071 394 : && qual == NIL && !gencol_published)
1072 : {
1073 366 : appendStringInfo(&cmd, "COPY %s",
1074 366 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1075 :
1076 : /* If the table has columns, then specify the columns */
1077 366 : if (lrel.natts)
1078 : {
1079 364 : appendStringInfoString(&cmd, " (");
1080 :
1081 : /*
1082 : * XXX Do we need to list the columns in all cases? Maybe we're
1083 : * replicating all columns?
1084 : */
1085 980 : for (int i = 0; i < lrel.natts; i++)
1086 : {
1087 616 : if (i > 0)
1088 252 : appendStringInfoString(&cmd, ", ");
1089 :
1090 616 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1091 : }
1092 :
1093 364 : appendStringInfoChar(&cmd, ')');
1094 : }
1095 :
1096 366 : appendStringInfoString(&cmd, " TO STDOUT");
1097 : }
1098 : else
1099 : {
1100 : /*
1101 : * For non-tables and tables with row filters, we need to do COPY
1102 : * (SELECT ...), but we can't just do SELECT * because we may need to
1103 : * copy only subset of columns including generated columns. For tables
1104 : * with any row filters, build a SELECT query with OR'ed row filters
1105 : * for COPY.
1106 : *
1107 : * We also need to use this same COPY (SELECT ...) syntax when
1108 : * generated columns are published, because copy of generated columns
1109 : * is not supported by the normal COPY.
1110 : */
1111 28 : appendStringInfoString(&cmd, "COPY (SELECT ");
1112 70 : for (int i = 0; i < lrel.natts; i++)
1113 : {
1114 42 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1115 42 : if (i < lrel.natts - 1)
1116 14 : appendStringInfoString(&cmd, ", ");
1117 : }
1118 :
1119 28 : appendStringInfoString(&cmd, " FROM ");
1120 :
1121 : /*
1122 : * For regular tables, make sure we don't copy data from a child that
1123 : * inherits the named table as those will be copied separately.
1124 : */
1125 28 : if (lrel.relkind == RELKIND_RELATION)
1126 22 : appendStringInfoString(&cmd, "ONLY ");
1127 :
1128 28 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1129 : /* list of OR'ed filters */
1130 28 : if (qual != NIL)
1131 : {
1132 : ListCell *lc;
1133 22 : char *q = strVal(linitial(qual));
1134 :
1135 22 : appendStringInfo(&cmd, " WHERE %s", q);
1136 24 : for_each_from(lc, qual, 1)
1137 : {
1138 2 : q = strVal(lfirst(lc));
1139 2 : appendStringInfo(&cmd, " OR %s", q);
1140 : }
1141 22 : list_free_deep(qual);
1142 : }
1143 :
1144 28 : appendStringInfoString(&cmd, ") TO STDOUT");
1145 : }
1146 :
1147 : /*
1148 : * Prior to v16, initial table synchronization will use text format even
1149 : * if the binary option is enabled for a subscription.
1150 : */
1151 394 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1152 394 : MySubscription->binary)
1153 : {
1154 10 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1155 10 : options = list_make1(makeDefElem("format",
1156 : (Node *) makeString("binary"), -1));
1157 : }
1158 :
1159 394 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1160 394 : pfree(cmd.data);
1161 394 : if (res->status != WALRCV_OK_COPY_OUT)
1162 0 : ereport(ERROR,
1163 : (errcode(ERRCODE_CONNECTION_FAILURE),
1164 : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1165 : lrel.nspname, lrel.relname, res->err)));
1166 394 : walrcv_clear_result(res);
1167 :
1168 394 : copybuf = makeStringInfo();
1169 :
1170 394 : pstate = make_parsestate(NULL);
1171 394 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1172 : NULL, false, false);
1173 :
1174 394 : attnamelist = make_copy_attnamelist(relmapentry);
1175 394 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1176 :
1177 : /* Do the copy */
1178 392 : (void) CopyFrom(cstate);
1179 :
1180 372 : logicalrep_rel_close(relmapentry, NoLock);
1181 372 : }
1182 :
1183 : /*
1184 : * Determine the tablesync slot name.
1185 : *
1186 : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1187 : * on slot name length. We append system_identifier to avoid slot_name
1188 : * collision with subscriptions in other clusters. With the current scheme
1189 : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1190 : * length of slot_name will be 50.
1191 : *
1192 : * The returned slot name is stored in the supplied buffer (syncslotname) with
1193 : * the given size.
1194 : *
1195 : * Note: We don't use the subscription slot name as part of tablesync slot name
1196 : * because we are responsible for cleaning up these slots and it could become
1197 : * impossible to recalculate what name to cleanup if the subscription slot name
1198 : * had changed.
1199 : */
1200 : void
1201 780 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1202 : char *syncslotname, Size szslot)
1203 : {
1204 780 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1205 : relid, GetSystemIdentifier());
1206 780 : }
1207 :
1208 : /*
1209 : * Start syncing the table in the sync worker.
1210 : *
1211 : * If nothing needs to be done to sync the table, we exit the worker without
1212 : * any further action.
1213 : *
1214 : * The returned slot name is palloc'ed in current memory context.
1215 : */
1216 : static char *
1217 398 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1218 : {
1219 : char *slotname;
1220 : char *err;
1221 : char relstate;
1222 : XLogRecPtr relstate_lsn;
1223 : Relation rel;
1224 : AclResult aclresult;
1225 : WalRcvExecResult *res;
1226 : char originname[NAMEDATALEN];
1227 : ReplOriginId originid;
1228 : UserContext ucxt;
1229 : bool must_use_password;
1230 : bool run_as_owner;
1231 :
1232 : /* Check the state of the table synchronization. */
1233 398 : StartTransactionCommand();
1234 398 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1235 398 : MyLogicalRepWorker->relid,
1236 : &relstate_lsn);
1237 398 : CommitTransactionCommand();
1238 :
1239 : /* Is the use of a password mandatory? */
1240 788 : must_use_password = MySubscription->passwordrequired &&
1241 390 : !MySubscription->ownersuperuser;
1242 :
1243 398 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1244 398 : MyLogicalRepWorker->relstate = relstate;
1245 398 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1246 398 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1247 :
1248 : /*
1249 : * If synchronization is already done or no longer necessary, exit now
1250 : * that we've updated shared memory state.
1251 : */
1252 398 : switch (relstate)
1253 : {
1254 0 : case SUBREL_STATE_SYNCDONE:
1255 : case SUBREL_STATE_READY:
1256 : case SUBREL_STATE_UNKNOWN:
1257 0 : FinishSyncWorker(); /* doesn't return */
1258 : }
1259 :
1260 : /* Calculate the name of the tablesync slot. */
1261 398 : slotname = (char *) palloc(NAMEDATALEN);
1262 398 : ReplicationSlotNameForTablesync(MySubscription->oid,
1263 398 : MyLogicalRepWorker->relid,
1264 : slotname,
1265 : NAMEDATALEN);
1266 :
1267 : /*
1268 : * Here we use the slot name instead of the subscription name as the
1269 : * application_name, so that it is different from the leader apply worker,
1270 : * so that synchronous replication can distinguish them.
1271 : */
1272 398 : LogRepWorkerWalRcvConn =
1273 398 : walrcv_connect(MySubscription->conninfo, true, true,
1274 : must_use_password,
1275 : slotname, &err);
1276 398 : if (LogRepWorkerWalRcvConn == NULL)
1277 0 : ereport(ERROR,
1278 : (errcode(ERRCODE_CONNECTION_FAILURE),
1279 : errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1280 : MySubscription->name, err)));
1281 :
1282 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1283 : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1284 : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1285 :
1286 : /* Assign the origin tracking record name. */
1287 398 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1288 398 : MyLogicalRepWorker->relid,
1289 : originname,
1290 : sizeof(originname));
1291 :
1292 398 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1293 : {
1294 : /*
1295 : * We have previously errored out before finishing the copy so the
1296 : * replication slot might exist. We want to remove the slot if it
1297 : * already exists and proceed.
1298 : *
1299 : * XXX We could also instead try to drop the slot, last time we failed
1300 : * but for that, we might need to clean up the copy state as it might
1301 : * be in the middle of fetching the rows. Also, if there is a network
1302 : * breakdown then it wouldn't have succeeded so trying it next time
1303 : * seems like a better bet.
1304 : */
1305 18 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1306 : }
1307 380 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1308 : {
1309 : /*
1310 : * The COPY phase was previously done, but tablesync then crashed
1311 : * before it was able to finish normally.
1312 : */
1313 0 : StartTransactionCommand();
1314 :
1315 : /*
1316 : * The origin tracking name must already exist. It was created first
1317 : * time this tablesync was launched.
1318 : */
1319 0 : originid = replorigin_by_name(originname, false);
1320 0 : replorigin_session_setup(originid, 0);
1321 0 : replorigin_xact_state.origin = originid;
1322 0 : *origin_startpos = replorigin_session_get_progress(false);
1323 :
1324 0 : CommitTransactionCommand();
1325 :
1326 0 : goto copy_table_done;
1327 : }
1328 :
1329 398 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1330 398 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1331 398 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1332 398 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1333 :
1334 : /*
1335 : * Update the state, create the replication origin, and make them visible
1336 : * to others.
1337 : */
1338 398 : StartTransactionCommand();
1339 398 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1340 398 : MyLogicalRepWorker->relid,
1341 398 : MyLogicalRepWorker->relstate,
1342 398 : MyLogicalRepWorker->relstate_lsn,
1343 : false);
1344 :
1345 : /*
1346 : * Create the replication origin in a separate transaction from the one
1347 : * that sets up the origin in shared memory. This prevents the risk that
1348 : * changes to the origin in shared memory cannot be rolled back if the
1349 : * transaction aborts.
1350 : */
1351 398 : originid = replorigin_by_name(originname, true);
1352 398 : if (!OidIsValid(originid))
1353 380 : originid = replorigin_create(originname);
1354 :
1355 398 : CommitTransactionCommand();
1356 398 : pgstat_report_stat(true);
1357 :
1358 398 : StartTransactionCommand();
1359 :
1360 : /*
1361 : * Use a standard write lock here. It might be better to disallow access
1362 : * to the table while it's being synchronized. But we don't want to block
1363 : * the main apply process from working and it has to open the relation in
1364 : * RowExclusiveLock when remapping remote relation id to local one.
1365 : */
1366 398 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1367 :
1368 : /*
1369 : * Start a transaction in the remote node in REPEATABLE READ mode. This
1370 : * ensures that both the replication slot we create (see below) and the
1371 : * COPY are consistent with each other.
1372 : */
1373 398 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1374 : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1375 : 0, NULL);
1376 398 : if (res->status != WALRCV_OK_COMMAND)
1377 0 : ereport(ERROR,
1378 : (errcode(ERRCODE_CONNECTION_FAILURE),
1379 : errmsg("table copy could not start transaction on publisher: %s",
1380 : res->err)));
1381 398 : walrcv_clear_result(res);
1382 :
1383 : /*
1384 : * Create a new permanent logical decoding slot. This slot will be used
1385 : * for the catchup phase after COPY is done, so tell it to use the
1386 : * snapshot to make the final data consistent.
1387 : */
1388 398 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1389 : slotname, false /* permanent */ , false /* two_phase */ ,
1390 : MySubscription->failover,
1391 : CRS_USE_SNAPSHOT, origin_startpos);
1392 :
1393 : /*
1394 : * Advance the origin to the LSN got from walrcv_create_slot and then set
1395 : * up the origin. The advancement is WAL logged for the purpose of
1396 : * recovery. Locks are to prevent the replication origin from vanishing
1397 : * while advancing.
1398 : *
1399 : * The purpose of doing these before the copy is to avoid doing the copy
1400 : * again due to any error in advancing or setting up origin tracking.
1401 : */
1402 398 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1403 398 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1404 : true /* go backward */ , true /* WAL log */ );
1405 398 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1406 :
1407 398 : replorigin_session_setup(originid, 0);
1408 398 : replorigin_xact_state.origin = originid;
1409 :
1410 : /*
1411 : * If the user did not opt to run as the owner of the subscription
1412 : * ('run_as_owner'), then copy the table as the owner of the table.
1413 : */
1414 398 : run_as_owner = MySubscription->runasowner;
1415 398 : if (!run_as_owner)
1416 396 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1417 :
1418 : /*
1419 : * Check that our table sync worker has permission to insert into the
1420 : * target table.
1421 : */
1422 398 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1423 : ACL_INSERT);
1424 398 : if (aclresult != ACLCHECK_OK)
1425 0 : aclcheck_error(aclresult,
1426 0 : get_relkind_objtype(rel->rd_rel->relkind),
1427 0 : RelationGetRelationName(rel));
1428 :
1429 : /*
1430 : * COPY FROM does not honor RLS policies. That is not a problem for
1431 : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1432 : * who has it implicitly), but other roles should not be able to
1433 : * circumvent RLS. Disallow logical replication into RLS enabled
1434 : * relations for such roles.
1435 : */
1436 398 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
1437 0 : ereport(ERROR,
1438 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1439 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1440 : GetUserNameFromId(GetUserId(), true),
1441 : RelationGetRelationName(rel))));
1442 :
1443 : /* Now do the initial data copy */
1444 398 : PushActiveSnapshot(GetTransactionSnapshot());
1445 398 : copy_table(rel);
1446 372 : PopActiveSnapshot();
1447 :
1448 372 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1449 372 : if (res->status != WALRCV_OK_COMMAND)
1450 0 : ereport(ERROR,
1451 : (errcode(ERRCODE_CONNECTION_FAILURE),
1452 : errmsg("table copy could not finish transaction on publisher: %s",
1453 : res->err)));
1454 372 : walrcv_clear_result(res);
1455 :
1456 372 : if (!run_as_owner)
1457 370 : RestoreUserContext(&ucxt);
1458 :
1459 372 : table_close(rel, NoLock);
1460 :
1461 : /* Make the copy visible. */
1462 372 : CommandCounterIncrement();
1463 :
1464 : /*
1465 : * Update the persisted state to indicate the COPY phase is done; make it
1466 : * visible to others.
1467 : */
1468 372 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1469 372 : MyLogicalRepWorker->relid,
1470 : SUBREL_STATE_FINISHEDCOPY,
1471 372 : MyLogicalRepWorker->relstate_lsn,
1472 : false);
1473 :
1474 372 : CommitTransactionCommand();
1475 :
1476 372 : copy_table_done:
1477 :
1478 372 : elog(DEBUG1,
1479 : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1480 : originname, LSN_FORMAT_ARGS(*origin_startpos));
1481 :
1482 : /*
1483 : * We are done with the initial data synchronization, update the state.
1484 : */
1485 372 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1486 372 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1487 372 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1488 372 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1489 :
1490 : /*
1491 : * Finally, wait until the leader apply worker tells us to catch up and
1492 : * then return to let LogicalRepApplyLoop do it.
1493 : */
1494 372 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1495 372 : return slotname;
1496 : }
1497 :
1498 : /*
1499 : * Execute the initial sync with error handling. Disable the subscription,
1500 : * if it's required.
1501 : *
1502 : * Allocate the slot name in long-lived context on return. Note that we don't
1503 : * handle FATAL errors which are probably because of system resource error and
1504 : * are not repeatable.
1505 : */
1506 : static void
1507 398 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1508 : {
1509 398 : char *sync_slotname = NULL;
1510 :
1511 : Assert(am_tablesync_worker());
1512 :
1513 398 : PG_TRY();
1514 : {
1515 : /* Call initial sync. */
1516 398 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1517 : }
1518 26 : PG_CATCH();
1519 : {
1520 26 : if (MySubscription->disableonerr)
1521 2 : DisableSubscriptionAndExit();
1522 : else
1523 : {
1524 : /*
1525 : * Report the worker failed during table synchronization. Abort
1526 : * the current transaction so that the stats message is sent in an
1527 : * idle state.
1528 : */
1529 24 : AbortOutOfAnyTransaction();
1530 24 : pgstat_report_subscription_error(MySubscription->oid,
1531 : WORKERTYPE_TABLESYNC);
1532 :
1533 24 : PG_RE_THROW();
1534 : }
1535 : }
1536 372 : PG_END_TRY();
1537 :
1538 : /* allocate slot name in long-lived context */
1539 372 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1540 372 : pfree(sync_slotname);
1541 372 : }
1542 :
1543 : /*
1544 : * Runs the tablesync worker.
1545 : *
1546 : * It starts syncing tables. After a successful sync, sets streaming options
1547 : * and starts streaming to catchup with apply worker.
1548 : */
1549 : static void
1550 398 : run_tablesync_worker(void)
1551 : {
1552 : char originname[NAMEDATALEN];
1553 398 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1554 398 : char *slotname = NULL;
1555 : WalRcvStreamOptions options;
1556 :
1557 398 : start_table_sync(&origin_startpos, &slotname);
1558 :
1559 372 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1560 372 : MyLogicalRepWorker->relid,
1561 : originname,
1562 : sizeof(originname));
1563 :
1564 372 : set_apply_error_context_origin(originname);
1565 :
1566 372 : set_stream_options(&options, slotname, &origin_startpos);
1567 :
1568 372 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1569 :
1570 : /* Apply the changes till we catchup with the apply worker. */
1571 372 : start_apply(origin_startpos);
1572 0 : }
1573 :
1574 : /* Logical Replication Tablesync worker entry point */
1575 : void
1576 400 : TableSyncWorkerMain(Datum main_arg)
1577 : {
1578 400 : int worker_slot = DatumGetInt32(main_arg);
1579 :
1580 400 : SetupApplyOrSyncWorker(worker_slot);
1581 :
1582 398 : run_tablesync_worker();
1583 :
1584 0 : FinishSyncWorker();
1585 : }
1586 :
1587 : /*
1588 : * If the subscription has no tables then return false.
1589 : *
1590 : * Otherwise, are all tablesyncs READY?
1591 : *
1592 : * Note: This function is not suitable to be called from outside of apply or
1593 : * tablesync workers because MySubscription needs to be already initialized.
1594 : */
1595 : bool
1596 502 : AllTablesyncsReady(void)
1597 : {
1598 : bool started_tx;
1599 : bool has_tables;
1600 :
1601 : /* We need up-to-date sync state info for subscription tables here. */
1602 502 : FetchRelationStates(&has_tables, NULL, &started_tx);
1603 :
1604 502 : if (started_tx)
1605 : {
1606 32 : CommitTransactionCommand();
1607 32 : pgstat_report_stat(true);
1608 : }
1609 :
1610 : /*
1611 : * Return false when there are no tables in subscription or not all tables
1612 : * are in ready state; true otherwise.
1613 : */
1614 502 : return has_tables && (table_states_not_ready == NIL);
1615 : }
1616 :
1617 : /*
1618 : * Return whether the subscription currently has any tables.
1619 : *
1620 : * Note: Unlike HasSubscriptionTables(), this function relies on cached
1621 : * information for subscription tables. Additionally, it should not be
1622 : * invoked outside of apply or tablesync workers, as MySubscription must be
1623 : * initialized first.
1624 : */
1625 : bool
1626 340 : HasSubscriptionTablesCached(void)
1627 : {
1628 : bool started_tx;
1629 : bool has_tables;
1630 :
1631 : /* We need up-to-date subscription tables info here */
1632 340 : FetchRelationStates(&has_tables, NULL, &started_tx);
1633 :
1634 340 : if (started_tx)
1635 : {
1636 0 : CommitTransactionCommand();
1637 0 : pgstat_report_stat(true);
1638 : }
1639 :
1640 340 : return has_tables;
1641 : }
1642 :
1643 : /*
1644 : * Update the two_phase state of the specified subscription in pg_subscription.
1645 : */
1646 : void
1647 20 : UpdateTwoPhaseState(Oid suboid, char new_state)
1648 : {
1649 : Relation rel;
1650 : HeapTuple tup;
1651 : bool nulls[Natts_pg_subscription];
1652 : bool replaces[Natts_pg_subscription];
1653 : Datum values[Natts_pg_subscription];
1654 :
1655 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1656 : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1657 : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1658 :
1659 20 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1660 20 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1661 20 : if (!HeapTupleIsValid(tup))
1662 0 : elog(ERROR,
1663 : "cache lookup failed for subscription oid %u",
1664 : suboid);
1665 :
1666 : /* Form a new tuple. */
1667 20 : memset(values, 0, sizeof(values));
1668 20 : memset(nulls, false, sizeof(nulls));
1669 20 : memset(replaces, false, sizeof(replaces));
1670 :
1671 : /* And update/set two_phase state */
1672 20 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1673 20 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1674 :
1675 20 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1676 : values, nulls, replaces);
1677 20 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1678 :
1679 20 : heap_freetuple(tup);
1680 20 : table_close(rel, RowExclusiveLock);
1681 20 : }
|