Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * tablesync.c
3 : * PostgreSQL logical replication: initial table data synchronization
4 : *
5 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/tablesync.c
9 : *
10 : * NOTES
11 : * This file contains code for initial table data synchronization for
12 : * logical replication.
13 : *
14 : * The initial data synchronization is done separately for each table,
15 : * in a separate apply worker that only fetches the initial snapshot data
16 : * from the publisher and then synchronizes the position in the stream with
17 : * the leader apply worker.
18 : *
19 : * There are several reasons for doing the synchronization this way:
20 : * - It allows us to parallelize the initial data synchronization
21 : * which lowers the time needed for it to happen.
22 : * - The initial synchronization does not have to hold the xid and LSN
23 : * for the time it takes to copy data of all tables, causing less
24 : * bloat and lower disk consumption compared to doing the
25 : * synchronization in a single process for the whole database.
26 : * - It allows us to synchronize any tables added after the initial
27 : * synchronization has finished.
28 : *
29 : * The stream position synchronization works in multiple steps:
30 : * - Apply worker requests a tablesync worker to start, setting the new
31 : * table state to INIT.
32 : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : * copying.
34 : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : * worker specific) state to indicate when the copy phase has completed, so
36 : * if the worker crashes with this (non-memory) state then the copy will not
37 : * be re-attempted.
38 : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : * until either the table state is set to SYNCDONE or the sync worker
42 : * exits.
43 : * - After the sync worker has seen the state change to CATCHUP, it will
44 : * read the stream and apply changes (acting like an apply worker) until
45 : * it catches up to the specified stream position. Then it sets the
46 : * state to SYNCDONE. There might be zero changes applied between
47 : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : * apply worker.
49 : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : * the table until it reaches the SYNCDONE stream position, at which
51 : * point it sets state to READY and stops tracking. Again, there might
52 : * be zero changes in between.
53 : *
54 : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : *
57 : * The catalog pg_subscription_rel is used to keep information about
58 : * subscribed tables and their state. The catalog holds all states
59 : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : *
61 : * Example flows look like this:
62 : * - Apply is in front:
63 : * sync:8
64 : * -> set in catalog FINISHEDCOPY
65 : * -> set in memory SYNCWAIT
66 : * apply:10
67 : * -> set in memory CATCHUP
68 : * -> enter wait-loop
69 : * sync:10
70 : * -> set in catalog SYNCDONE
71 : * -> exit
72 : * apply:10
73 : * -> exit wait-loop
74 : * -> continue rep
75 : * apply:11
76 : * -> set in catalog READY
77 : *
78 : * - Sync is in front:
79 : * sync:10
80 : * -> set in catalog FINISHEDCOPY
81 : * -> set in memory SYNCWAIT
82 : * apply:8
83 : * -> set in memory CATCHUP
84 : * -> continue per-table filtering
85 : * sync:10
86 : * -> set in catalog SYNCDONE
87 : * -> exit
88 : * apply:10
89 : * -> set in catalog READY
90 : * -> stop per-table filtering
91 : * -> continue rep
92 : *-------------------------------------------------------------------------
93 : */
94 :
95 : #include "postgres.h"
96 :
97 : #include "access/table.h"
98 : #include "access/xact.h"
99 : #include "catalog/indexing.h"
100 : #include "catalog/pg_subscription_rel.h"
101 : #include "catalog/pg_type.h"
102 : #include "commands/copy.h"
103 : #include "miscadmin.h"
104 : #include "nodes/makefuncs.h"
105 : #include "parser/parse_relation.h"
106 : #include "pgstat.h"
107 : #include "replication/logicallauncher.h"
108 : #include "replication/logicalrelation.h"
109 : #include "replication/logicalworker.h"
110 : #include "replication/origin.h"
111 : #include "replication/slot.h"
112 : #include "replication/walreceiver.h"
113 : #include "replication/worker_internal.h"
114 : #include "storage/ipc.h"
115 : #include "storage/lmgr.h"
116 : #include "utils/acl.h"
117 : #include "utils/array.h"
118 : #include "utils/builtins.h"
119 : #include "utils/lsyscache.h"
120 : #include "utils/memutils.h"
121 : #include "utils/rls.h"
122 : #include "utils/snapmgr.h"
123 : #include "utils/syscache.h"
124 : #include "utils/usercontext.h"
125 :
126 : typedef enum
127 : {
128 : SYNC_TABLE_STATE_NEEDS_REBUILD,
129 : SYNC_TABLE_STATE_REBUILD_STARTED,
130 : SYNC_TABLE_STATE_VALID,
131 : } SyncingTablesState;
132 :
133 : static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
134 : static List *table_states_not_ready = NIL;
135 : static bool FetchTableStates(bool *started_tx);
136 :
137 : static StringInfo copybuf = NULL;
138 :
139 : /*
140 : * Exit routine for synchronization worker.
141 : */
142 : pg_noreturn static void
143 364 : finish_sync_worker(void)
144 : {
145 : /*
146 : * Commit any outstanding transaction. This is the usual case, unless
147 : * there was nothing to do for the table.
148 : */
149 364 : if (IsTransactionState())
150 : {
151 364 : CommitTransactionCommand();
152 364 : pgstat_report_stat(true);
153 : }
154 :
155 : /* And flush all writes. */
156 364 : XLogFlush(GetXLogWriteRecPtr());
157 :
158 364 : StartTransactionCommand();
159 364 : ereport(LOG,
160 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
161 : MySubscription->name,
162 : get_rel_name(MyLogicalRepWorker->relid))));
163 364 : CommitTransactionCommand();
164 :
165 : /* Find the leader apply worker and signal it. */
166 364 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
167 :
168 : /* Stop gracefully */
169 364 : proc_exit(0);
170 : }
171 :
172 : /*
173 : * Wait until the relation sync state is set in the catalog to the expected
174 : * one; return true when it happens.
175 : *
176 : * Returns false if the table sync worker or the table itself have
177 : * disappeared, or the table state has been reset.
178 : *
179 : * Currently, this is used in the apply worker when transitioning from
180 : * CATCHUP state to SYNCDONE.
181 : */
182 : static bool
183 360 : wait_for_relation_state_change(Oid relid, char expected_state)
184 : {
185 : char state;
186 :
187 : for (;;)
188 432 : {
189 : LogicalRepWorker *worker;
190 : XLogRecPtr statelsn;
191 :
192 792 : CHECK_FOR_INTERRUPTS();
193 :
194 792 : InvalidateCatalogSnapshot();
195 792 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
196 : relid, &statelsn);
197 :
198 792 : if (state == SUBREL_STATE_UNKNOWN)
199 0 : break;
200 :
201 792 : if (state == expected_state)
202 360 : return true;
203 :
204 : /* Check if the sync worker is still running and bail if not. */
205 432 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
206 432 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
207 : false);
208 432 : LWLockRelease(LogicalRepWorkerLock);
209 432 : if (!worker)
210 0 : break;
211 :
212 432 : (void) WaitLatch(MyLatch,
213 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
214 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
215 :
216 432 : ResetLatch(MyLatch);
217 : }
218 :
219 0 : return false;
220 : }
221 :
222 : /*
223 : * Wait until the apply worker changes the state of our synchronization
224 : * worker to the expected one.
225 : *
226 : * Used when transitioning from SYNCWAIT state to CATCHUP.
227 : *
228 : * Returns false if the apply worker has disappeared.
229 : */
230 : static bool
231 364 : wait_for_worker_state_change(char expected_state)
232 : {
233 : int rc;
234 :
235 : for (;;)
236 364 : {
237 : LogicalRepWorker *worker;
238 :
239 728 : CHECK_FOR_INTERRUPTS();
240 :
241 : /*
242 : * Done if already in correct state. (We assume this fetch is atomic
243 : * enough to not give a misleading answer if we do it with no lock.)
244 : */
245 728 : if (MyLogicalRepWorker->relstate == expected_state)
246 364 : return true;
247 :
248 : /*
249 : * Bail out if the apply worker has died, else signal it we're
250 : * waiting.
251 : */
252 364 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
253 364 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
254 : InvalidOid, false);
255 364 : if (worker && worker->proc)
256 364 : logicalrep_worker_wakeup_ptr(worker);
257 364 : LWLockRelease(LogicalRepWorkerLock);
258 364 : if (!worker)
259 0 : break;
260 :
261 : /*
262 : * Wait. We expect to get a latch signal back from the apply worker,
263 : * but use a timeout in case it dies without sending one.
264 : */
265 364 : rc = WaitLatch(MyLatch,
266 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
267 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
268 :
269 364 : if (rc & WL_LATCH_SET)
270 364 : ResetLatch(MyLatch);
271 : }
272 :
273 0 : return false;
274 : }
275 :
276 : /*
277 : * Callback from syscache invalidation.
278 : */
279 : void
280 3452 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
281 : {
282 3452 : table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
283 3452 : }
284 :
285 : /*
286 : * Handle table synchronization cooperation from the synchronization
287 : * worker.
288 : *
289 : * If the sync worker is in CATCHUP state and reached (or passed) the
290 : * predetermined synchronization point in the WAL stream, mark the table as
291 : * SYNCDONE and finish.
292 : */
293 : static void
294 454 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
295 : {
296 454 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
297 :
298 454 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
299 454 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
300 : {
301 : TimeLineID tli;
302 364 : char syncslotname[NAMEDATALEN] = {0};
303 364 : char originname[NAMEDATALEN] = {0};
304 :
305 364 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
306 364 : MyLogicalRepWorker->relstate_lsn = current_lsn;
307 :
308 364 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
309 :
310 : /*
311 : * UpdateSubscriptionRelState must be called within a transaction.
312 : */
313 364 : if (!IsTransactionState())
314 364 : StartTransactionCommand();
315 :
316 364 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
317 364 : MyLogicalRepWorker->relid,
318 364 : MyLogicalRepWorker->relstate,
319 364 : MyLogicalRepWorker->relstate_lsn);
320 :
321 : /*
322 : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
323 : * the slot.
324 : */
325 364 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
326 :
327 : /*
328 : * Cleanup the tablesync slot.
329 : *
330 : * This has to be done after updating the state because otherwise if
331 : * there is an error while doing the database operations we won't be
332 : * able to rollback dropped slot.
333 : */
334 364 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
335 364 : MyLogicalRepWorker->relid,
336 : syncslotname,
337 : sizeof(syncslotname));
338 :
339 : /*
340 : * It is important to give an error if we are unable to drop the slot,
341 : * otherwise, it won't be dropped till the corresponding subscription
342 : * is dropped. So passing missing_ok = false.
343 : */
344 364 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
345 :
346 364 : CommitTransactionCommand();
347 364 : pgstat_report_stat(false);
348 :
349 : /*
350 : * Start a new transaction to clean up the tablesync origin tracking.
351 : * This transaction will be ended within the finish_sync_worker().
352 : * Now, even, if we fail to remove this here, the apply worker will
353 : * ensure to clean it up afterward.
354 : *
355 : * We need to do this after the table state is set to SYNCDONE.
356 : * Otherwise, if an error occurs while performing the database
357 : * operation, the worker will be restarted and the in-memory state of
358 : * replication progress (remote_lsn) won't be rolled-back which would
359 : * have been cleared before restart. So, the restarted worker will use
360 : * invalid replication progress state resulting in replay of
361 : * transactions that have already been applied.
362 : */
363 364 : StartTransactionCommand();
364 :
365 364 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
366 364 : MyLogicalRepWorker->relid,
367 : originname,
368 : sizeof(originname));
369 :
370 : /*
371 : * Resetting the origin session removes the ownership of the slot.
372 : * This is needed to allow the origin to be dropped.
373 : */
374 364 : replorigin_session_reset();
375 364 : replorigin_session_origin = InvalidRepOriginId;
376 364 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
377 364 : replorigin_session_origin_timestamp = 0;
378 :
379 : /*
380 : * Drop the tablesync's origin tracking if exists.
381 : *
382 : * There is a chance that the user is concurrently performing refresh
383 : * for the subscription where we remove the table state and its origin
384 : * or the apply worker would have removed this origin. So passing
385 : * missing_ok = true.
386 : */
387 364 : replorigin_drop_by_name(originname, true, false);
388 :
389 364 : finish_sync_worker();
390 : }
391 : else
392 90 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
393 90 : }
394 :
395 : /*
396 : * Handle table synchronization cooperation from the apply worker.
397 : *
398 : * Walk over all subscription tables that are individually tracked by the
399 : * apply process (currently, all that have state other than
400 : * SUBREL_STATE_READY) and manage synchronization for them.
401 : *
402 : * If there are tables that need synchronizing and are not being synchronized
403 : * yet, start sync workers for them (if there are free slots for sync
404 : * workers). To prevent starting the sync worker for the same relation at a
405 : * high frequency after a failure, we store its last start time with each sync
406 : * state info. We start the sync worker for the same relation after waiting
407 : * at least wal_retrieve_retry_interval.
408 : *
409 : * For tables that are being synchronized already, check if sync workers
410 : * either need action from the apply worker or have finished. This is the
411 : * SYNCWAIT to CATCHUP transition.
412 : *
413 : * If the synchronization position is reached (SYNCDONE), then the table can
414 : * be marked as READY and is no longer tracked.
415 : */
416 : static void
417 11204 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
418 : {
419 : struct tablesync_start_time_mapping
420 : {
421 : Oid relid;
422 : TimestampTz last_start_time;
423 : };
424 : static HTAB *last_start_times = NULL;
425 : ListCell *lc;
426 11204 : bool started_tx = false;
427 11204 : bool should_exit = false;
428 :
429 : Assert(!IsTransactionState());
430 :
431 : /* We need up-to-date sync state info for subscription tables here. */
432 11204 : FetchTableStates(&started_tx);
433 :
434 : /*
435 : * Prepare a hash table for tracking last start times of workers, to avoid
436 : * immediate restarts. We don't need it if there are no tables that need
437 : * syncing.
438 : */
439 11204 : if (table_states_not_ready != NIL && !last_start_times)
440 238 : {
441 : HASHCTL ctl;
442 :
443 238 : ctl.keysize = sizeof(Oid);
444 238 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
445 238 : last_start_times = hash_create("Logical replication table sync worker start times",
446 : 256, &ctl, HASH_ELEM | HASH_BLOBS);
447 : }
448 :
449 : /*
450 : * Clean up the hash table when we're done with all tables (just to
451 : * release the bit of memory).
452 : */
453 10966 : else if (table_states_not_ready == NIL && last_start_times)
454 : {
455 176 : hash_destroy(last_start_times);
456 176 : last_start_times = NULL;
457 : }
458 :
459 : /*
460 : * Process all tables that are being synchronized.
461 : */
462 14486 : foreach(lc, table_states_not_ready)
463 : {
464 3284 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
465 :
466 3284 : if (rstate->state == SUBREL_STATE_SYNCDONE)
467 : {
468 : /*
469 : * Apply has caught up to the position where the table sync has
470 : * finished. Mark the table as ready so that the apply will just
471 : * continue to replicate it normally.
472 : */
473 360 : if (current_lsn >= rstate->lsn)
474 : {
475 : char originname[NAMEDATALEN];
476 :
477 358 : rstate->state = SUBREL_STATE_READY;
478 358 : rstate->lsn = current_lsn;
479 358 : if (!started_tx)
480 : {
481 14 : StartTransactionCommand();
482 14 : started_tx = true;
483 : }
484 :
485 : /*
486 : * Remove the tablesync origin tracking if exists.
487 : *
488 : * There is a chance that the user is concurrently performing
489 : * refresh for the subscription where we remove the table
490 : * state and its origin or the tablesync worker would have
491 : * already removed this origin. We can't rely on tablesync
492 : * worker to remove the origin tracking as if there is any
493 : * error while dropping we won't restart it to drop the
494 : * origin. So passing missing_ok = true.
495 : */
496 358 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
497 : rstate->relid,
498 : originname,
499 : sizeof(originname));
500 358 : replorigin_drop_by_name(originname, true, false);
501 :
502 : /*
503 : * Update the state to READY only after the origin cleanup.
504 : */
505 358 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
506 358 : rstate->relid, rstate->state,
507 : rstate->lsn);
508 : }
509 : }
510 : else
511 : {
512 : LogicalRepWorker *syncworker;
513 :
514 : /*
515 : * Look for a sync worker for this relation.
516 : */
517 2924 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
518 :
519 2924 : syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
520 : rstate->relid, false);
521 :
522 2924 : if (syncworker)
523 : {
524 : /* Found one, update our copy of its state */
525 1338 : SpinLockAcquire(&syncworker->relmutex);
526 1338 : rstate->state = syncworker->relstate;
527 1338 : rstate->lsn = syncworker->relstate_lsn;
528 1338 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
529 : {
530 : /*
531 : * Sync worker is waiting for apply. Tell sync worker it
532 : * can catchup now.
533 : */
534 360 : syncworker->relstate = SUBREL_STATE_CATCHUP;
535 360 : syncworker->relstate_lsn =
536 360 : Max(syncworker->relstate_lsn, current_lsn);
537 : }
538 1338 : SpinLockRelease(&syncworker->relmutex);
539 :
540 : /* If we told worker to catch up, wait for it. */
541 1338 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
542 : {
543 : /* Signal the sync worker, as it may be waiting for us. */
544 360 : if (syncworker->proc)
545 360 : logicalrep_worker_wakeup_ptr(syncworker);
546 :
547 : /* Now safe to release the LWLock */
548 360 : LWLockRelease(LogicalRepWorkerLock);
549 :
550 360 : if (started_tx)
551 : {
552 : /*
553 : * We must commit the existing transaction to release
554 : * the existing locks before entering a busy loop.
555 : * This is required to avoid any undetected deadlocks
556 : * due to any existing lock as deadlock detector won't
557 : * be able to detect the waits on the latch.
558 : */
559 360 : CommitTransactionCommand();
560 360 : pgstat_report_stat(false);
561 : }
562 :
563 : /*
564 : * Enter busy loop and wait for synchronization worker to
565 : * reach expected state (or die trying).
566 : */
567 360 : StartTransactionCommand();
568 360 : started_tx = true;
569 :
570 360 : wait_for_relation_state_change(rstate->relid,
571 : SUBREL_STATE_SYNCDONE);
572 : }
573 : else
574 978 : LWLockRelease(LogicalRepWorkerLock);
575 : }
576 : else
577 : {
578 : /*
579 : * If there is no sync worker for this table yet, count
580 : * running sync workers for this subscription, while we have
581 : * the lock.
582 : */
583 : int nsyncworkers =
584 1586 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
585 :
586 : /* Now safe to release the LWLock */
587 1586 : LWLockRelease(LogicalRepWorkerLock);
588 :
589 : /*
590 : * If there are free sync worker slot(s), start a new sync
591 : * worker for the table.
592 : */
593 1586 : if (nsyncworkers < max_sync_workers_per_subscription)
594 : {
595 406 : TimestampTz now = GetCurrentTimestamp();
596 : struct tablesync_start_time_mapping *hentry;
597 : bool found;
598 :
599 406 : hentry = hash_search(last_start_times, &rstate->relid,
600 : HASH_ENTER, &found);
601 :
602 436 : if (!found ||
603 30 : TimestampDifferenceExceeds(hentry->last_start_time, now,
604 : wal_retrieve_retry_interval))
605 : {
606 : /*
607 : * Set the last_start_time even if we fail to start
608 : * the worker, so that we won't retry until
609 : * wal_retrieve_retry_interval has elapsed.
610 : */
611 384 : hentry->last_start_time = now;
612 384 : (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
613 384 : MyLogicalRepWorker->dbid,
614 384 : MySubscription->oid,
615 384 : MySubscription->name,
616 384 : MyLogicalRepWorker->userid,
617 : rstate->relid,
618 : DSM_HANDLE_INVALID);
619 : }
620 : }
621 : }
622 : }
623 : }
624 :
625 11202 : if (started_tx)
626 : {
627 : /*
628 : * Even when the two_phase mode is requested by the user, it remains
629 : * as 'pending' until all tablesyncs have reached READY state.
630 : *
631 : * When this happens, we restart the apply worker and (if the
632 : * conditions are still ok) then the two_phase tri-state will become
633 : * 'enabled' at that time.
634 : *
635 : * Note: If the subscription has no tables then leave the state as
636 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
637 : * work.
638 : */
639 1634 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
640 : {
641 50 : CommandCounterIncrement(); /* make updates visible */
642 50 : if (AllTablesyncsReady())
643 : {
644 12 : ereport(LOG,
645 : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
646 : MySubscription->name)));
647 12 : should_exit = true;
648 : }
649 : }
650 :
651 1634 : CommitTransactionCommand();
652 1634 : pgstat_report_stat(true);
653 : }
654 :
655 11202 : if (should_exit)
656 : {
657 : /*
658 : * Reset the last-start time for this worker so that the launcher will
659 : * restart it without waiting for wal_retrieve_retry_interval.
660 : */
661 12 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
662 :
663 12 : proc_exit(0);
664 : }
665 11190 : }
666 :
667 : /*
668 : * Process possible state change(s) of tables that are being synchronized.
669 : */
670 : void
671 11702 : process_syncing_tables(XLogRecPtr current_lsn)
672 : {
673 11702 : switch (MyLogicalRepWorker->type)
674 : {
675 44 : case WORKERTYPE_PARALLEL_APPLY:
676 :
677 : /*
678 : * Skip for parallel apply workers because they only operate on
679 : * tables that are in a READY state. See pa_can_start() and
680 : * should_apply_changes_for_rel().
681 : */
682 44 : break;
683 :
684 454 : case WORKERTYPE_TABLESYNC:
685 454 : process_syncing_tables_for_sync(current_lsn);
686 90 : break;
687 :
688 11204 : case WORKERTYPE_APPLY:
689 11204 : process_syncing_tables_for_apply(current_lsn);
690 11190 : break;
691 :
692 0 : case WORKERTYPE_UNKNOWN:
693 : /* Should never happen. */
694 0 : elog(ERROR, "Unknown worker type");
695 : }
696 11324 : }
697 :
698 : /*
699 : * Create list of columns for COPY based on logical relation mapping.
700 : */
701 : static List *
702 380 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
703 : {
704 380 : List *attnamelist = NIL;
705 : int i;
706 :
707 1020 : for (i = 0; i < rel->remoterel.natts; i++)
708 : {
709 640 : attnamelist = lappend(attnamelist,
710 640 : makeString(rel->remoterel.attnames[i]));
711 : }
712 :
713 :
714 380 : return attnamelist;
715 : }
716 :
717 : /*
718 : * Data source callback for the COPY FROM, which reads from the remote
719 : * connection and passes the data back to our local COPY.
720 : */
721 : static int
722 27962 : copy_read_data(void *outbuf, int minread, int maxread)
723 : {
724 27962 : int bytesread = 0;
725 : int avail;
726 :
727 : /* If there are some leftover data from previous read, use it. */
728 27962 : avail = copybuf->len - copybuf->cursor;
729 27962 : if (avail)
730 : {
731 0 : if (avail > maxread)
732 0 : avail = maxread;
733 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
734 0 : copybuf->cursor += avail;
735 0 : maxread -= avail;
736 0 : bytesread += avail;
737 : }
738 :
739 27964 : while (maxread > 0 && bytesread < minread)
740 : {
741 27964 : pgsocket fd = PGINVALID_SOCKET;
742 : int len;
743 27964 : char *buf = NULL;
744 :
745 : for (;;)
746 : {
747 : /* Try read the data. */
748 27964 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
749 :
750 27964 : CHECK_FOR_INTERRUPTS();
751 :
752 27964 : if (len == 0)
753 2 : break;
754 27962 : else if (len < 0)
755 27962 : return bytesread;
756 : else
757 : {
758 : /* Process the data */
759 27586 : copybuf->data = buf;
760 27586 : copybuf->len = len;
761 27586 : copybuf->cursor = 0;
762 :
763 27586 : avail = copybuf->len - copybuf->cursor;
764 27586 : if (avail > maxread)
765 0 : avail = maxread;
766 27586 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
767 27586 : outbuf = (char *) outbuf + avail;
768 27586 : copybuf->cursor += avail;
769 27586 : maxread -= avail;
770 27586 : bytesread += avail;
771 : }
772 :
773 27586 : if (maxread <= 0 || bytesread >= minread)
774 27586 : return bytesread;
775 : }
776 :
777 : /*
778 : * Wait for more data or latch.
779 : */
780 2 : (void) WaitLatchOrSocket(MyLatch,
781 : WL_SOCKET_READABLE | WL_LATCH_SET |
782 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
783 : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
784 :
785 2 : ResetLatch(MyLatch);
786 : }
787 :
788 0 : return bytesread;
789 : }
790 :
791 :
792 : /*
793 : * Get information about remote relation in similar fashion the RELATION
794 : * message provides during replication.
795 : *
796 : * This function also returns (a) the relation qualifications to be used in
797 : * the COPY command, and (b) whether the remote relation has published any
798 : * generated column.
799 : */
800 : static void
801 386 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
802 : List **qual, bool *gencol_published)
803 : {
804 : WalRcvExecResult *res;
805 : StringInfoData cmd;
806 : TupleTableSlot *slot;
807 386 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
808 386 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
809 386 : Oid qualRow[] = {TEXTOID};
810 : bool isnull;
811 : int natt;
812 386 : StringInfo pub_names = NULL;
813 386 : Bitmapset *included_cols = NULL;
814 386 : int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
815 :
816 386 : lrel->nspname = nspname;
817 386 : lrel->relname = relname;
818 :
819 : /* First fetch Oid and replica identity. */
820 386 : initStringInfo(&cmd);
821 386 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
822 : " FROM pg_catalog.pg_class c"
823 : " INNER JOIN pg_catalog.pg_namespace n"
824 : " ON (c.relnamespace = n.oid)"
825 : " WHERE n.nspname = %s"
826 : " AND c.relname = %s",
827 : quote_literal_cstr(nspname),
828 : quote_literal_cstr(relname));
829 386 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
830 : lengthof(tableRow), tableRow);
831 :
832 386 : if (res->status != WALRCV_OK_TUPLES)
833 0 : ereport(ERROR,
834 : (errcode(ERRCODE_CONNECTION_FAILURE),
835 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
836 : nspname, relname, res->err)));
837 :
838 386 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
839 386 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
840 2 : ereport(ERROR,
841 : (errcode(ERRCODE_UNDEFINED_OBJECT),
842 : errmsg("table \"%s.%s\" not found on publisher",
843 : nspname, relname)));
844 :
845 384 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
846 : Assert(!isnull);
847 384 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
848 : Assert(!isnull);
849 384 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
850 : Assert(!isnull);
851 :
852 384 : ExecDropSingleTupleTableSlot(slot);
853 384 : walrcv_clear_result(res);
854 :
855 :
856 : /*
857 : * Get column lists for each relation.
858 : *
859 : * We need to do this before fetching info about column names and types,
860 : * so that we can skip columns that should not be replicated.
861 : */
862 384 : if (server_version >= 150000)
863 : {
864 : WalRcvExecResult *pubres;
865 : TupleTableSlot *tslot;
866 384 : Oid attrsRow[] = {INT2VECTOROID};
867 :
868 : /* Build the pub_names comma-separated string. */
869 384 : pub_names = makeStringInfo();
870 384 : GetPublicationsStr(MySubscription->publications, pub_names, true);
871 :
872 : /*
873 : * Fetch info about column lists for the relation (from all the
874 : * publications).
875 : */
876 384 : resetStringInfo(&cmd);
877 384 : appendStringInfo(&cmd,
878 : "SELECT DISTINCT"
879 : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
880 : " THEN NULL ELSE gpt.attrs END)"
881 : " FROM pg_publication p,"
882 : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
883 : " pg_class c"
884 : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
885 : " AND p.pubname IN ( %s )",
886 : lrel->remoteid,
887 : pub_names->data);
888 :
889 384 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
890 : lengthof(attrsRow), attrsRow);
891 :
892 384 : if (pubres->status != WALRCV_OK_TUPLES)
893 0 : ereport(ERROR,
894 : (errcode(ERRCODE_CONNECTION_FAILURE),
895 : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
896 : nspname, relname, pubres->err)));
897 :
898 : /*
899 : * We don't support the case where the column list is different for
900 : * the same table when combining publications. See comments atop
901 : * fetch_table_list. So there should be only one row returned.
902 : * Although we already checked this when creating the subscription, we
903 : * still need to check here in case the column list was changed after
904 : * creating the subscription and before the sync worker is started.
905 : */
906 384 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
907 0 : ereport(ERROR,
908 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
909 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
910 : nspname, relname));
911 :
912 : /*
913 : * Get the column list and build a single bitmap with the attnums.
914 : *
915 : * If we find a NULL value, it means all the columns should be
916 : * replicated.
917 : */
918 384 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
919 384 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
920 : {
921 384 : Datum cfval = slot_getattr(tslot, 1, &isnull);
922 :
923 384 : if (!isnull)
924 : {
925 : ArrayType *arr;
926 : int nelems;
927 : int16 *elems;
928 :
929 44 : arr = DatumGetArrayTypeP(cfval);
930 44 : nelems = ARR_DIMS(arr)[0];
931 44 : elems = (int16 *) ARR_DATA_PTR(arr);
932 :
933 118 : for (natt = 0; natt < nelems; natt++)
934 74 : included_cols = bms_add_member(included_cols, elems[natt]);
935 : }
936 :
937 384 : ExecClearTuple(tslot);
938 : }
939 384 : ExecDropSingleTupleTableSlot(tslot);
940 :
941 384 : walrcv_clear_result(pubres);
942 : }
943 :
944 : /*
945 : * Now fetch column names and types.
946 : */
947 384 : resetStringInfo(&cmd);
948 384 : appendStringInfoString(&cmd,
949 : "SELECT a.attnum,"
950 : " a.attname,"
951 : " a.atttypid,"
952 : " a.attnum = ANY(i.indkey)");
953 :
954 : /* Generated columns can be replicated since version 18. */
955 384 : if (server_version >= 180000)
956 384 : appendStringInfoString(&cmd, ", a.attgenerated != ''");
957 :
958 768 : appendStringInfo(&cmd,
959 : " FROM pg_catalog.pg_attribute a"
960 : " LEFT JOIN pg_catalog.pg_index i"
961 : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
962 : " WHERE a.attnum > 0::pg_catalog.int2"
963 : " AND NOT a.attisdropped %s"
964 : " AND a.attrelid = %u"
965 : " ORDER BY a.attnum",
966 : lrel->remoteid,
967 384 : (server_version >= 120000 && server_version < 180000 ?
968 : "AND a.attgenerated = ''" : ""),
969 : lrel->remoteid);
970 384 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
971 : server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
972 :
973 384 : if (res->status != WALRCV_OK_TUPLES)
974 0 : ereport(ERROR,
975 : (errcode(ERRCODE_CONNECTION_FAILURE),
976 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
977 : nspname, relname, res->err)));
978 :
979 : /* We don't know the number of rows coming, so allocate enough space. */
980 384 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
981 384 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
982 384 : lrel->attkeys = NULL;
983 :
984 : /*
985 : * Store the columns as a list of names. Ignore those that are not
986 : * present in the column list, if there is one.
987 : */
988 384 : natt = 0;
989 384 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
990 1098 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
991 : {
992 : char *rel_colname;
993 : AttrNumber attnum;
994 :
995 714 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
996 : Assert(!isnull);
997 :
998 : /* If the column is not in the column list, skip it. */
999 714 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
1000 : {
1001 62 : ExecClearTuple(slot);
1002 62 : continue;
1003 : }
1004 :
1005 652 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1006 : Assert(!isnull);
1007 :
1008 652 : lrel->attnames[natt] = rel_colname;
1009 652 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
1010 : Assert(!isnull);
1011 :
1012 652 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
1013 212 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1014 :
1015 : /* Remember if the remote table has published any generated column. */
1016 652 : if (server_version >= 180000 && !(*gencol_published))
1017 : {
1018 652 : *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
1019 : Assert(!isnull);
1020 : }
1021 :
1022 : /* Should never happen. */
1023 652 : if (++natt >= MaxTupleAttributeNumber)
1024 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
1025 : nspname, relname);
1026 :
1027 652 : ExecClearTuple(slot);
1028 : }
1029 384 : ExecDropSingleTupleTableSlot(slot);
1030 :
1031 384 : lrel->natts = natt;
1032 :
1033 384 : walrcv_clear_result(res);
1034 :
1035 : /*
1036 : * Get relation's row filter expressions. DISTINCT avoids the same
1037 : * expression of a table in multiple publications from being included
1038 : * multiple times in the final expression.
1039 : *
1040 : * We need to copy the row even if it matches just one of the
1041 : * publications, so we later combine all the quals with OR.
1042 : *
1043 : * For initial synchronization, row filtering can be ignored in following
1044 : * cases:
1045 : *
1046 : * 1) one of the subscribed publications for the table hasn't specified
1047 : * any row filter
1048 : *
1049 : * 2) one of the subscribed publications has puballtables set to true
1050 : *
1051 : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1052 : * that includes this relation
1053 : */
1054 384 : if (server_version >= 150000)
1055 : {
1056 : /* Reuse the already-built pub_names. */
1057 : Assert(pub_names != NULL);
1058 :
1059 : /* Check for row filters. */
1060 384 : resetStringInfo(&cmd);
1061 384 : appendStringInfo(&cmd,
1062 : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1063 : " FROM pg_publication p,"
1064 : " LATERAL pg_get_publication_tables(p.pubname) gpt"
1065 : " WHERE gpt.relid = %u"
1066 : " AND p.pubname IN ( %s )",
1067 : lrel->remoteid,
1068 : pub_names->data);
1069 :
1070 384 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1071 :
1072 384 : if (res->status != WALRCV_OK_TUPLES)
1073 0 : ereport(ERROR,
1074 : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1075 : nspname, relname, res->err)));
1076 :
1077 : /*
1078 : * Multiple row filter expressions for the same table will be combined
1079 : * by COPY using OR. If any of the filter expressions for this table
1080 : * are null, it means the whole table will be copied. In this case it
1081 : * is not necessary to construct a unified row filter expression at
1082 : * all.
1083 : */
1084 384 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1085 414 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1086 : {
1087 392 : Datum rf = slot_getattr(slot, 1, &isnull);
1088 :
1089 392 : if (!isnull)
1090 30 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1091 : else
1092 : {
1093 : /* Ignore filters and cleanup as necessary. */
1094 362 : if (*qual)
1095 : {
1096 6 : list_free_deep(*qual);
1097 6 : *qual = NIL;
1098 : }
1099 362 : break;
1100 : }
1101 :
1102 30 : ExecClearTuple(slot);
1103 : }
1104 384 : ExecDropSingleTupleTableSlot(slot);
1105 :
1106 384 : walrcv_clear_result(res);
1107 384 : destroyStringInfo(pub_names);
1108 : }
1109 :
1110 384 : pfree(cmd.data);
1111 384 : }
1112 :
1113 : /*
1114 : * Copy existing data of a table from publisher.
1115 : *
1116 : * Caller is responsible for locking the local relation.
1117 : */
1118 : static void
1119 386 : copy_table(Relation rel)
1120 : {
1121 : LogicalRepRelMapEntry *relmapentry;
1122 : LogicalRepRelation lrel;
1123 386 : List *qual = NIL;
1124 : WalRcvExecResult *res;
1125 : StringInfoData cmd;
1126 : CopyFromState cstate;
1127 : List *attnamelist;
1128 : ParseState *pstate;
1129 386 : List *options = NIL;
1130 386 : bool gencol_published = false;
1131 :
1132 : /* Get the publisher relation info. */
1133 386 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
1134 386 : RelationGetRelationName(rel), &lrel, &qual,
1135 : &gencol_published);
1136 :
1137 : /* Put the relation into relmap. */
1138 384 : logicalrep_relmap_update(&lrel);
1139 :
1140 : /* Map the publisher relation to local one. */
1141 384 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1142 : Assert(rel == relmapentry->localrel);
1143 :
1144 : /* Start copy on the publisher. */
1145 380 : initStringInfo(&cmd);
1146 :
1147 : /* Regular table with no row filter or generated columns */
1148 380 : if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
1149 : {
1150 324 : appendStringInfo(&cmd, "COPY %s",
1151 324 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1152 :
1153 : /* If the table has columns, then specify the columns */
1154 324 : if (lrel.natts)
1155 : {
1156 322 : appendStringInfoString(&cmd, " (");
1157 :
1158 : /*
1159 : * XXX Do we need to list the columns in all cases? Maybe we're
1160 : * replicating all columns?
1161 : */
1162 878 : for (int i = 0; i < lrel.natts; i++)
1163 : {
1164 556 : if (i > 0)
1165 234 : appendStringInfoString(&cmd, ", ");
1166 :
1167 556 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1168 : }
1169 :
1170 322 : appendStringInfoChar(&cmd, ')');
1171 : }
1172 :
1173 324 : appendStringInfoString(&cmd, " TO STDOUT");
1174 : }
1175 : else
1176 : {
1177 : /*
1178 : * For non-tables and tables with row filters, we need to do COPY
1179 : * (SELECT ...), but we can't just do SELECT * because we may need to
1180 : * copy only subset of columns including generated columns. For tables
1181 : * with any row filters, build a SELECT query with OR'ed row filters
1182 : * for COPY.
1183 : *
1184 : * We also need to use this same COPY (SELECT ...) syntax when
1185 : * generated columns are published, because copy of generated columns
1186 : * is not supported by the normal COPY.
1187 : */
1188 56 : appendStringInfoString(&cmd, "COPY (SELECT ");
1189 140 : for (int i = 0; i < lrel.natts; i++)
1190 : {
1191 84 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1192 84 : if (i < lrel.natts - 1)
1193 28 : appendStringInfoString(&cmd, ", ");
1194 : }
1195 :
1196 56 : appendStringInfoString(&cmd, " FROM ");
1197 :
1198 : /*
1199 : * For regular tables, make sure we don't copy data from a child that
1200 : * inherits the named table as those will be copied separately.
1201 : */
1202 56 : if (lrel.relkind == RELKIND_RELATION)
1203 22 : appendStringInfoString(&cmd, "ONLY ");
1204 :
1205 56 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1206 : /* list of OR'ed filters */
1207 56 : if (qual != NIL)
1208 : {
1209 : ListCell *lc;
1210 22 : char *q = strVal(linitial(qual));
1211 :
1212 22 : appendStringInfo(&cmd, " WHERE %s", q);
1213 24 : for_each_from(lc, qual, 1)
1214 : {
1215 2 : q = strVal(lfirst(lc));
1216 2 : appendStringInfo(&cmd, " OR %s", q);
1217 : }
1218 22 : list_free_deep(qual);
1219 : }
1220 :
1221 56 : appendStringInfoString(&cmd, ") TO STDOUT");
1222 : }
1223 :
1224 : /*
1225 : * Prior to v16, initial table synchronization will use text format even
1226 : * if the binary option is enabled for a subscription.
1227 : */
1228 380 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1229 380 : MySubscription->binary)
1230 : {
1231 10 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1232 10 : options = list_make1(makeDefElem("format",
1233 : (Node *) makeString("binary"), -1));
1234 : }
1235 :
1236 380 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1237 380 : pfree(cmd.data);
1238 380 : if (res->status != WALRCV_OK_COPY_OUT)
1239 0 : ereport(ERROR,
1240 : (errcode(ERRCODE_CONNECTION_FAILURE),
1241 : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1242 : lrel.nspname, lrel.relname, res->err)));
1243 380 : walrcv_clear_result(res);
1244 :
1245 380 : copybuf = makeStringInfo();
1246 :
1247 380 : pstate = make_parsestate(NULL);
1248 380 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1249 : NULL, false, false);
1250 :
1251 380 : attnamelist = make_copy_attnamelist(relmapentry);
1252 380 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1253 :
1254 : /* Do the copy */
1255 378 : (void) CopyFrom(cstate);
1256 :
1257 364 : logicalrep_rel_close(relmapentry, NoLock);
1258 364 : }
1259 :
1260 : /*
1261 : * Determine the tablesync slot name.
1262 : *
1263 : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1264 : * on slot name length. We append system_identifier to avoid slot_name
1265 : * collision with subscriptions in other clusters. With the current scheme
1266 : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1267 : * length of slot_name will be 50.
1268 : *
1269 : * The returned slot name is stored in the supplied buffer (syncslotname) with
1270 : * the given size.
1271 : *
1272 : * Note: We don't use the subscription slot name as part of tablesync slot name
1273 : * because we are responsible for cleaning up these slots and it could become
1274 : * impossible to recalculate what name to cleanup if the subscription slot name
1275 : * had changed.
1276 : */
1277 : void
1278 760 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1279 : char *syncslotname, Size szslot)
1280 : {
1281 760 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1282 : relid, GetSystemIdentifier());
1283 760 : }
1284 :
1285 : /*
1286 : * Start syncing the table in the sync worker.
1287 : *
1288 : * If nothing needs to be done to sync the table, we exit the worker without
1289 : * any further action.
1290 : *
1291 : * The returned slot name is palloc'ed in current memory context.
1292 : */
1293 : static char *
1294 386 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1295 : {
1296 : char *slotname;
1297 : char *err;
1298 : char relstate;
1299 : XLogRecPtr relstate_lsn;
1300 : Relation rel;
1301 : AclResult aclresult;
1302 : WalRcvExecResult *res;
1303 : char originname[NAMEDATALEN];
1304 : RepOriginId originid;
1305 : UserContext ucxt;
1306 : bool must_use_password;
1307 : bool run_as_owner;
1308 :
1309 : /* Check the state of the table synchronization. */
1310 386 : StartTransactionCommand();
1311 386 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1312 386 : MyLogicalRepWorker->relid,
1313 : &relstate_lsn);
1314 386 : CommitTransactionCommand();
1315 :
1316 : /* Is the use of a password mandatory? */
1317 764 : must_use_password = MySubscription->passwordrequired &&
1318 378 : !MySubscription->ownersuperuser;
1319 :
1320 386 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1321 386 : MyLogicalRepWorker->relstate = relstate;
1322 386 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1323 386 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1324 :
1325 : /*
1326 : * If synchronization is already done or no longer necessary, exit now
1327 : * that we've updated shared memory state.
1328 : */
1329 386 : switch (relstate)
1330 : {
1331 0 : case SUBREL_STATE_SYNCDONE:
1332 : case SUBREL_STATE_READY:
1333 : case SUBREL_STATE_UNKNOWN:
1334 0 : finish_sync_worker(); /* doesn't return */
1335 : }
1336 :
1337 : /* Calculate the name of the tablesync slot. */
1338 386 : slotname = (char *) palloc(NAMEDATALEN);
1339 386 : ReplicationSlotNameForTablesync(MySubscription->oid,
1340 386 : MyLogicalRepWorker->relid,
1341 : slotname,
1342 : NAMEDATALEN);
1343 :
1344 : /*
1345 : * Here we use the slot name instead of the subscription name as the
1346 : * application_name, so that it is different from the leader apply worker,
1347 : * so that synchronous replication can distinguish them.
1348 : */
1349 386 : LogRepWorkerWalRcvConn =
1350 386 : walrcv_connect(MySubscription->conninfo, true, true,
1351 : must_use_password,
1352 : slotname, &err);
1353 386 : if (LogRepWorkerWalRcvConn == NULL)
1354 0 : ereport(ERROR,
1355 : (errcode(ERRCODE_CONNECTION_FAILURE),
1356 : errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1357 : MySubscription->name, err)));
1358 :
1359 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1360 : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1361 : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1362 :
1363 : /* Assign the origin tracking record name. */
1364 386 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1365 386 : MyLogicalRepWorker->relid,
1366 : originname,
1367 : sizeof(originname));
1368 :
1369 386 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1370 : {
1371 : /*
1372 : * We have previously errored out before finishing the copy so the
1373 : * replication slot might exist. We want to remove the slot if it
1374 : * already exists and proceed.
1375 : *
1376 : * XXX We could also instead try to drop the slot, last time we failed
1377 : * but for that, we might need to clean up the copy state as it might
1378 : * be in the middle of fetching the rows. Also, if there is a network
1379 : * breakdown then it wouldn't have succeeded so trying it next time
1380 : * seems like a better bet.
1381 : */
1382 14 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1383 : }
1384 372 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1385 : {
1386 : /*
1387 : * The COPY phase was previously done, but tablesync then crashed
1388 : * before it was able to finish normally.
1389 : */
1390 0 : StartTransactionCommand();
1391 :
1392 : /*
1393 : * The origin tracking name must already exist. It was created first
1394 : * time this tablesync was launched.
1395 : */
1396 0 : originid = replorigin_by_name(originname, false);
1397 0 : replorigin_session_setup(originid, 0);
1398 0 : replorigin_session_origin = originid;
1399 0 : *origin_startpos = replorigin_session_get_progress(false);
1400 :
1401 0 : CommitTransactionCommand();
1402 :
1403 0 : goto copy_table_done;
1404 : }
1405 :
1406 386 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1407 386 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1408 386 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1409 386 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1410 :
1411 : /* Update the state and make it visible to others. */
1412 386 : StartTransactionCommand();
1413 386 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1414 386 : MyLogicalRepWorker->relid,
1415 386 : MyLogicalRepWorker->relstate,
1416 386 : MyLogicalRepWorker->relstate_lsn);
1417 386 : CommitTransactionCommand();
1418 386 : pgstat_report_stat(true);
1419 :
1420 386 : StartTransactionCommand();
1421 :
1422 : /*
1423 : * Use a standard write lock here. It might be better to disallow access
1424 : * to the table while it's being synchronized. But we don't want to block
1425 : * the main apply process from working and it has to open the relation in
1426 : * RowExclusiveLock when remapping remote relation id to local one.
1427 : */
1428 386 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1429 :
1430 : /*
1431 : * Start a transaction in the remote node in REPEATABLE READ mode. This
1432 : * ensures that both the replication slot we create (see below) and the
1433 : * COPY are consistent with each other.
1434 : */
1435 386 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1436 : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1437 : 0, NULL);
1438 386 : if (res->status != WALRCV_OK_COMMAND)
1439 0 : ereport(ERROR,
1440 : (errcode(ERRCODE_CONNECTION_FAILURE),
1441 : errmsg("table copy could not start transaction on publisher: %s",
1442 : res->err)));
1443 386 : walrcv_clear_result(res);
1444 :
1445 : /*
1446 : * Create a new permanent logical decoding slot. This slot will be used
1447 : * for the catchup phase after COPY is done, so tell it to use the
1448 : * snapshot to make the final data consistent.
1449 : */
1450 386 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1451 : slotname, false /* permanent */ , false /* two_phase */ ,
1452 : MySubscription->failover,
1453 : CRS_USE_SNAPSHOT, origin_startpos);
1454 :
1455 : /*
1456 : * Setup replication origin tracking. The purpose of doing this before the
1457 : * copy is to avoid doing the copy again due to any error in setting up
1458 : * origin tracking.
1459 : */
1460 386 : originid = replorigin_by_name(originname, true);
1461 386 : if (!OidIsValid(originid))
1462 : {
1463 : /*
1464 : * Origin tracking does not exist, so create it now.
1465 : *
1466 : * Then advance to the LSN got from walrcv_create_slot. This is WAL
1467 : * logged for the purpose of recovery. Locks are to prevent the
1468 : * replication origin from vanishing while advancing.
1469 : */
1470 386 : originid = replorigin_create(originname);
1471 :
1472 386 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1473 386 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1474 : true /* go backward */ , true /* WAL log */ );
1475 386 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1476 :
1477 386 : replorigin_session_setup(originid, 0);
1478 386 : replorigin_session_origin = originid;
1479 : }
1480 : else
1481 : {
1482 0 : ereport(ERROR,
1483 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1484 : errmsg("replication origin \"%s\" already exists",
1485 : originname)));
1486 : }
1487 :
1488 : /*
1489 : * Make sure that the copy command runs as the table owner, unless the
1490 : * user has opted out of that behaviour.
1491 : */
1492 386 : run_as_owner = MySubscription->runasowner;
1493 386 : if (!run_as_owner)
1494 384 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1495 :
1496 : /*
1497 : * Check that our table sync worker has permission to insert into the
1498 : * target table.
1499 : */
1500 386 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1501 : ACL_INSERT);
1502 386 : if (aclresult != ACLCHECK_OK)
1503 0 : aclcheck_error(aclresult,
1504 0 : get_relkind_objtype(rel->rd_rel->relkind),
1505 0 : RelationGetRelationName(rel));
1506 :
1507 : /*
1508 : * COPY FROM does not honor RLS policies. That is not a problem for
1509 : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1510 : * who has it implicitly), but other roles should not be able to
1511 : * circumvent RLS. Disallow logical replication into RLS enabled
1512 : * relations for such roles.
1513 : */
1514 386 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
1515 0 : ereport(ERROR,
1516 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1517 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1518 : GetUserNameFromId(GetUserId(), true),
1519 : RelationGetRelationName(rel))));
1520 :
1521 : /* Now do the initial data copy */
1522 386 : PushActiveSnapshot(GetTransactionSnapshot());
1523 386 : copy_table(rel);
1524 364 : PopActiveSnapshot();
1525 :
1526 364 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1527 364 : if (res->status != WALRCV_OK_COMMAND)
1528 0 : ereport(ERROR,
1529 : (errcode(ERRCODE_CONNECTION_FAILURE),
1530 : errmsg("table copy could not finish transaction on publisher: %s",
1531 : res->err)));
1532 364 : walrcv_clear_result(res);
1533 :
1534 364 : if (!run_as_owner)
1535 362 : RestoreUserContext(&ucxt);
1536 :
1537 364 : table_close(rel, NoLock);
1538 :
1539 : /* Make the copy visible. */
1540 364 : CommandCounterIncrement();
1541 :
1542 : /*
1543 : * Update the persisted state to indicate the COPY phase is done; make it
1544 : * visible to others.
1545 : */
1546 364 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1547 364 : MyLogicalRepWorker->relid,
1548 : SUBREL_STATE_FINISHEDCOPY,
1549 364 : MyLogicalRepWorker->relstate_lsn);
1550 :
1551 364 : CommitTransactionCommand();
1552 :
1553 364 : copy_table_done:
1554 :
1555 364 : elog(DEBUG1,
1556 : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1557 : originname, LSN_FORMAT_ARGS(*origin_startpos));
1558 :
1559 : /*
1560 : * We are done with the initial data synchronization, update the state.
1561 : */
1562 364 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1563 364 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1564 364 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1565 364 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1566 :
1567 : /*
1568 : * Finally, wait until the leader apply worker tells us to catch up and
1569 : * then return to let LogicalRepApplyLoop do it.
1570 : */
1571 364 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1572 364 : return slotname;
1573 : }
1574 :
1575 : /*
1576 : * Common code to fetch the up-to-date sync state info into the static lists.
1577 : *
1578 : * Returns true if subscription has 1 or more tables, else false.
1579 : *
1580 : * Note: If this function started the transaction (indicated by the parameter)
1581 : * then it is the caller's responsibility to commit it.
1582 : */
1583 : static bool
1584 11340 : FetchTableStates(bool *started_tx)
1585 : {
1586 : static bool has_subrels = false;
1587 :
1588 11340 : *started_tx = false;
1589 :
1590 11340 : if (table_states_validity != SYNC_TABLE_STATE_VALID)
1591 : {
1592 : MemoryContext oldctx;
1593 : List *rstates;
1594 : ListCell *lc;
1595 : SubscriptionRelState *rstate;
1596 :
1597 1686 : table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
1598 :
1599 : /* Clean the old lists. */
1600 1686 : list_free_deep(table_states_not_ready);
1601 1686 : table_states_not_ready = NIL;
1602 :
1603 1686 : if (!IsTransactionState())
1604 : {
1605 1654 : StartTransactionCommand();
1606 1654 : *started_tx = true;
1607 : }
1608 :
1609 : /* Fetch all non-ready tables. */
1610 1686 : rstates = GetSubscriptionRelations(MySubscription->oid, true);
1611 :
1612 : /* Allocate the tracking info in a permanent memory context. */
1613 1686 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1614 4518 : foreach(lc, rstates)
1615 : {
1616 2832 : rstate = palloc(sizeof(SubscriptionRelState));
1617 2832 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1618 2832 : table_states_not_ready = lappend(table_states_not_ready, rstate);
1619 : }
1620 1686 : MemoryContextSwitchTo(oldctx);
1621 :
1622 : /*
1623 : * Does the subscription have tables?
1624 : *
1625 : * If there were not-READY relations found then we know it does. But
1626 : * if table_states_not_ready was empty we still need to check again to
1627 : * see if there are 0 tables.
1628 : */
1629 2132 : has_subrels = (table_states_not_ready != NIL) ||
1630 446 : HasSubscriptionRelations(MySubscription->oid);
1631 :
1632 : /*
1633 : * If the subscription relation cache has been invalidated since we
1634 : * entered this routine, we still use and return the relations we just
1635 : * finished constructing, to avoid infinite loops, but we leave the
1636 : * table states marked as stale so that we'll rebuild it again on next
1637 : * access. Otherwise, we mark the table states as valid.
1638 : */
1639 1686 : if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
1640 1686 : table_states_validity = SYNC_TABLE_STATE_VALID;
1641 : }
1642 :
1643 11340 : return has_subrels;
1644 : }
1645 :
1646 : /*
1647 : * Execute the initial sync with error handling. Disable the subscription,
1648 : * if it's required.
1649 : *
1650 : * Allocate the slot name in long-lived context on return. Note that we don't
1651 : * handle FATAL errors which are probably because of system resource error and
1652 : * are not repeatable.
1653 : */
1654 : static void
1655 386 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1656 : {
1657 386 : char *sync_slotname = NULL;
1658 :
1659 : Assert(am_tablesync_worker());
1660 :
1661 386 : PG_TRY();
1662 : {
1663 : /* Call initial sync. */
1664 386 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1665 : }
1666 22 : PG_CATCH();
1667 : {
1668 22 : if (MySubscription->disableonerr)
1669 2 : DisableSubscriptionAndExit();
1670 : else
1671 : {
1672 : /*
1673 : * Report the worker failed during table synchronization. Abort
1674 : * the current transaction so that the stats message is sent in an
1675 : * idle state.
1676 : */
1677 20 : AbortOutOfAnyTransaction();
1678 20 : pgstat_report_subscription_error(MySubscription->oid, false);
1679 :
1680 20 : PG_RE_THROW();
1681 : }
1682 : }
1683 364 : PG_END_TRY();
1684 :
1685 : /* allocate slot name in long-lived context */
1686 364 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1687 364 : pfree(sync_slotname);
1688 364 : }
1689 :
1690 : /*
1691 : * Runs the tablesync worker.
1692 : *
1693 : * It starts syncing tables. After a successful sync, sets streaming options
1694 : * and starts streaming to catchup with apply worker.
1695 : */
1696 : static void
1697 386 : run_tablesync_worker()
1698 : {
1699 : char originname[NAMEDATALEN];
1700 386 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1701 386 : char *slotname = NULL;
1702 : WalRcvStreamOptions options;
1703 :
1704 386 : start_table_sync(&origin_startpos, &slotname);
1705 :
1706 364 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1707 364 : MyLogicalRepWorker->relid,
1708 : originname,
1709 : sizeof(originname));
1710 :
1711 364 : set_apply_error_context_origin(originname);
1712 :
1713 364 : set_stream_options(&options, slotname, &origin_startpos);
1714 :
1715 364 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1716 :
1717 : /* Apply the changes till we catchup with the apply worker. */
1718 364 : start_apply(origin_startpos);
1719 0 : }
1720 :
1721 : /* Logical Replication Tablesync worker entry point */
1722 : void
1723 388 : TablesyncWorkerMain(Datum main_arg)
1724 : {
1725 388 : int worker_slot = DatumGetInt32(main_arg);
1726 :
1727 388 : SetupApplyOrSyncWorker(worker_slot);
1728 :
1729 386 : run_tablesync_worker();
1730 :
1731 0 : finish_sync_worker();
1732 : }
1733 :
1734 : /*
1735 : * If the subscription has no tables then return false.
1736 : *
1737 : * Otherwise, are all tablesyncs READY?
1738 : *
1739 : * Note: This function is not suitable to be called from outside of apply or
1740 : * tablesync workers because MySubscription needs to be already initialized.
1741 : */
1742 : bool
1743 136 : AllTablesyncsReady(void)
1744 : {
1745 136 : bool started_tx = false;
1746 136 : bool has_subrels = false;
1747 :
1748 : /* We need up-to-date sync state info for subscription tables here. */
1749 136 : has_subrels = FetchTableStates(&started_tx);
1750 :
1751 136 : if (started_tx)
1752 : {
1753 32 : CommitTransactionCommand();
1754 32 : pgstat_report_stat(true);
1755 : }
1756 :
1757 : /*
1758 : * Return false when there are no tables in subscription or not all tables
1759 : * are in ready state; true otherwise.
1760 : */
1761 136 : return has_subrels && (table_states_not_ready == NIL);
1762 : }
1763 :
1764 : /*
1765 : * Update the two_phase state of the specified subscription in pg_subscription.
1766 : */
1767 : void
1768 20 : UpdateTwoPhaseState(Oid suboid, char new_state)
1769 : {
1770 : Relation rel;
1771 : HeapTuple tup;
1772 : bool nulls[Natts_pg_subscription];
1773 : bool replaces[Natts_pg_subscription];
1774 : Datum values[Natts_pg_subscription];
1775 :
1776 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1777 : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1778 : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1779 :
1780 20 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1781 20 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1782 20 : if (!HeapTupleIsValid(tup))
1783 0 : elog(ERROR,
1784 : "cache lookup failed for subscription oid %u",
1785 : suboid);
1786 :
1787 : /* Form a new tuple. */
1788 20 : memset(values, 0, sizeof(values));
1789 20 : memset(nulls, false, sizeof(nulls));
1790 20 : memset(replaces, false, sizeof(replaces));
1791 :
1792 : /* And update/set two_phase state */
1793 20 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1794 20 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1795 :
1796 20 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1797 : values, nulls, replaces);
1798 20 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1799 :
1800 20 : heap_freetuple(tup);
1801 20 : table_close(rel, RowExclusiveLock);
1802 20 : }
|