Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * tablesync.c
3 : * PostgreSQL logical replication: initial table data synchronization
4 : *
5 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/tablesync.c
9 : *
10 : * NOTES
11 : * This file contains code for initial table data synchronization for
12 : * logical replication.
13 : *
14 : * The initial data synchronization is done separately for each table,
15 : * in a separate apply worker that only fetches the initial snapshot data
16 : * from the publisher and then synchronizes the position in the stream with
17 : * the leader apply worker.
18 : *
19 : * There are several reasons for doing the synchronization this way:
20 : * - It allows us to parallelize the initial data synchronization
21 : * which lowers the time needed for it to happen.
22 : * - The initial synchronization does not have to hold the xid and LSN
23 : * for the time it takes to copy data of all tables, causing less
24 : * bloat and lower disk consumption compared to doing the
25 : * synchronization in a single process for the whole database.
26 : * - It allows us to synchronize any tables added after the initial
27 : * synchronization has finished.
28 : *
29 : * The stream position synchronization works in multiple steps:
30 : * - Apply worker requests a tablesync worker to start, setting the new
31 : * table state to INIT.
32 : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : * copying.
34 : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : * worker specific) state to indicate when the copy phase has completed, so
36 : * if the worker crashes with this (non-memory) state then the copy will not
37 : * be re-attempted.
38 : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : * until either the table state is set to SYNCDONE or the sync worker
42 : * exits.
43 : * - After the sync worker has seen the state change to CATCHUP, it will
44 : * read the stream and apply changes (acting like an apply worker) until
45 : * it catches up to the specified stream position. Then it sets the
46 : * state to SYNCDONE. There might be zero changes applied between
47 : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : * apply worker.
49 : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : * the table until it reaches the SYNCDONE stream position, at which
51 : * point it sets state to READY and stops tracking. Again, there might
52 : * be zero changes in between.
53 : *
54 : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : *
57 : * The catalog pg_subscription_rel is used to keep information about
58 : * subscribed tables and their state. The catalog holds all states
59 : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : *
61 : * Example flows look like this:
62 : * - Apply is in front:
63 : * sync:8
64 : * -> set in catalog FINISHEDCOPY
65 : * -> set in memory SYNCWAIT
66 : * apply:10
67 : * -> set in memory CATCHUP
68 : * -> enter wait-loop
69 : * sync:10
70 : * -> set in catalog SYNCDONE
71 : * -> exit
72 : * apply:10
73 : * -> exit wait-loop
74 : * -> continue rep
75 : * apply:11
76 : * -> set in catalog READY
77 : *
78 : * - Sync is in front:
79 : * sync:10
80 : * -> set in catalog FINISHEDCOPY
81 : * -> set in memory SYNCWAIT
82 : * apply:8
83 : * -> set in memory CATCHUP
84 : * -> continue per-table filtering
85 : * sync:10
86 : * -> set in catalog SYNCDONE
87 : * -> exit
88 : * apply:10
89 : * -> set in catalog READY
90 : * -> stop per-table filtering
91 : * -> continue rep
92 : *-------------------------------------------------------------------------
93 : */
94 :
95 : #include "postgres.h"
96 :
97 : #include "access/table.h"
98 : #include "access/xact.h"
99 : #include "catalog/indexing.h"
100 : #include "catalog/pg_subscription_rel.h"
101 : #include "catalog/pg_type.h"
102 : #include "commands/copy.h"
103 : #include "miscadmin.h"
104 : #include "nodes/makefuncs.h"
105 : #include "parser/parse_relation.h"
106 : #include "pgstat.h"
107 : #include "replication/logicallauncher.h"
108 : #include "replication/logicalrelation.h"
109 : #include "replication/logicalworker.h"
110 : #include "replication/origin.h"
111 : #include "replication/slot.h"
112 : #include "replication/walreceiver.h"
113 : #include "replication/worker_internal.h"
114 : #include "storage/ipc.h"
115 : #include "storage/lmgr.h"
116 : #include "utils/acl.h"
117 : #include "utils/array.h"
118 : #include "utils/builtins.h"
119 : #include "utils/lsyscache.h"
120 : #include "utils/memutils.h"
121 : #include "utils/rls.h"
122 : #include "utils/snapmgr.h"
123 : #include "utils/syscache.h"
124 : #include "utils/usercontext.h"
125 :
126 : typedef enum
127 : {
128 : SYNC_TABLE_STATE_NEEDS_REBUILD,
129 : SYNC_TABLE_STATE_REBUILD_STARTED,
130 : SYNC_TABLE_STATE_VALID,
131 : } SyncingTablesState;
132 :
133 : static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
134 : static List *table_states_not_ready = NIL;
135 : static bool FetchTableStates(bool *started_tx);
136 :
137 : static StringInfo copybuf = NULL;
138 :
139 : /*
140 : * Exit routine for synchronization worker.
141 : */
142 : pg_noreturn static void
143 366 : finish_sync_worker(void)
144 : {
145 : /*
146 : * Commit any outstanding transaction. This is the usual case, unless
147 : * there was nothing to do for the table.
148 : */
149 366 : if (IsTransactionState())
150 : {
151 366 : CommitTransactionCommand();
152 366 : pgstat_report_stat(true);
153 : }
154 :
155 : /* And flush all writes. */
156 366 : XLogFlush(GetXLogWriteRecPtr());
157 :
158 366 : StartTransactionCommand();
159 366 : ereport(LOG,
160 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
161 : MySubscription->name,
162 : get_rel_name(MyLogicalRepWorker->relid))));
163 366 : CommitTransactionCommand();
164 :
165 : /* Find the leader apply worker and signal it. */
166 366 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
167 :
168 : /* Stop gracefully */
169 366 : proc_exit(0);
170 : }
171 :
172 : /*
173 : * Wait until the relation sync state is set in the catalog to the expected
174 : * one; return true when it happens.
175 : *
176 : * Returns false if the table sync worker or the table itself have
177 : * disappeared, or the table state has been reset.
178 : *
179 : * Currently, this is used in the apply worker when transitioning from
180 : * CATCHUP state to SYNCDONE.
181 : */
182 : static bool
183 362 : wait_for_relation_state_change(Oid relid, char expected_state)
184 : {
185 : char state;
186 :
187 : for (;;)
188 394 : {
189 : LogicalRepWorker *worker;
190 : XLogRecPtr statelsn;
191 :
192 756 : CHECK_FOR_INTERRUPTS();
193 :
194 756 : InvalidateCatalogSnapshot();
195 756 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
196 : relid, &statelsn);
197 :
198 756 : if (state == SUBREL_STATE_UNKNOWN)
199 0 : break;
200 :
201 756 : if (state == expected_state)
202 362 : return true;
203 :
204 : /* Check if the sync worker is still running and bail if not. */
205 394 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
206 394 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
207 : false);
208 394 : LWLockRelease(LogicalRepWorkerLock);
209 394 : if (!worker)
210 0 : break;
211 :
212 394 : (void) WaitLatch(MyLatch,
213 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
214 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
215 :
216 394 : ResetLatch(MyLatch);
217 : }
218 :
219 0 : return false;
220 : }
221 :
222 : /*
223 : * Wait until the apply worker changes the state of our synchronization
224 : * worker to the expected one.
225 : *
226 : * Used when transitioning from SYNCWAIT state to CATCHUP.
227 : *
228 : * Returns false if the apply worker has disappeared.
229 : */
230 : static bool
231 366 : wait_for_worker_state_change(char expected_state)
232 : {
233 : int rc;
234 :
235 : for (;;)
236 366 : {
237 : LogicalRepWorker *worker;
238 :
239 732 : CHECK_FOR_INTERRUPTS();
240 :
241 : /*
242 : * Done if already in correct state. (We assume this fetch is atomic
243 : * enough to not give a misleading answer if we do it with no lock.)
244 : */
245 732 : if (MyLogicalRepWorker->relstate == expected_state)
246 366 : return true;
247 :
248 : /*
249 : * Bail out if the apply worker has died, else signal it we're
250 : * waiting.
251 : */
252 366 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
253 366 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
254 : InvalidOid, false);
255 366 : if (worker && worker->proc)
256 366 : logicalrep_worker_wakeup_ptr(worker);
257 366 : LWLockRelease(LogicalRepWorkerLock);
258 366 : if (!worker)
259 0 : break;
260 :
261 : /*
262 : * Wait. We expect to get a latch signal back from the apply worker,
263 : * but use a timeout in case it dies without sending one.
264 : */
265 366 : rc = WaitLatch(MyLatch,
266 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
267 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
268 :
269 366 : if (rc & WL_LATCH_SET)
270 366 : ResetLatch(MyLatch);
271 : }
272 :
273 0 : return false;
274 : }
275 :
276 : /*
277 : * Callback from syscache invalidation.
278 : */
279 : void
280 3428 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
281 : {
282 3428 : table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
283 3428 : }
284 :
285 : /*
286 : * Handle table synchronization cooperation from the synchronization
287 : * worker.
288 : *
289 : * If the sync worker is in CATCHUP state and reached (or passed) the
290 : * predetermined synchronization point in the WAL stream, mark the table as
291 : * SYNCDONE and finish.
292 : */
293 : static void
294 426 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
295 : {
296 426 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
297 :
298 426 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
299 426 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
300 : {
301 : TimeLineID tli;
302 366 : char syncslotname[NAMEDATALEN] = {0};
303 366 : char originname[NAMEDATALEN] = {0};
304 :
305 366 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
306 366 : MyLogicalRepWorker->relstate_lsn = current_lsn;
307 :
308 366 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
309 :
310 : /*
311 : * UpdateSubscriptionRelState must be called within a transaction.
312 : */
313 366 : if (!IsTransactionState())
314 366 : StartTransactionCommand();
315 :
316 366 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
317 366 : MyLogicalRepWorker->relid,
318 366 : MyLogicalRepWorker->relstate,
319 366 : MyLogicalRepWorker->relstate_lsn);
320 :
321 : /*
322 : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
323 : * the slot.
324 : */
325 366 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
326 :
327 : /*
328 : * Cleanup the tablesync slot.
329 : *
330 : * This has to be done after updating the state because otherwise if
331 : * there is an error while doing the database operations we won't be
332 : * able to rollback dropped slot.
333 : */
334 366 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
335 366 : MyLogicalRepWorker->relid,
336 : syncslotname,
337 : sizeof(syncslotname));
338 :
339 : /*
340 : * It is important to give an error if we are unable to drop the slot,
341 : * otherwise, it won't be dropped till the corresponding subscription
342 : * is dropped. So passing missing_ok = false.
343 : */
344 366 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
345 :
346 366 : CommitTransactionCommand();
347 366 : pgstat_report_stat(false);
348 :
349 : /*
350 : * Start a new transaction to clean up the tablesync origin tracking.
351 : * This transaction will be ended within the finish_sync_worker().
352 : * Now, even, if we fail to remove this here, the apply worker will
353 : * ensure to clean it up afterward.
354 : *
355 : * We need to do this after the table state is set to SYNCDONE.
356 : * Otherwise, if an error occurs while performing the database
357 : * operation, the worker will be restarted and the in-memory state of
358 : * replication progress (remote_lsn) won't be rolled-back which would
359 : * have been cleared before restart. So, the restarted worker will use
360 : * invalid replication progress state resulting in replay of
361 : * transactions that have already been applied.
362 : */
363 366 : StartTransactionCommand();
364 :
365 366 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
366 366 : MyLogicalRepWorker->relid,
367 : originname,
368 : sizeof(originname));
369 :
370 : /*
371 : * Resetting the origin session removes the ownership of the slot.
372 : * This is needed to allow the origin to be dropped.
373 : */
374 366 : replorigin_session_reset();
375 366 : replorigin_session_origin = InvalidRepOriginId;
376 366 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
377 366 : replorigin_session_origin_timestamp = 0;
378 :
379 : /*
380 : * Drop the tablesync's origin tracking if exists.
381 : *
382 : * There is a chance that the user is concurrently performing refresh
383 : * for the subscription where we remove the table state and its origin
384 : * or the apply worker would have removed this origin. So passing
385 : * missing_ok = true.
386 : */
387 366 : replorigin_drop_by_name(originname, true, false);
388 :
389 366 : finish_sync_worker();
390 : }
391 : else
392 60 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
393 60 : }
394 :
395 : /*
396 : * Handle table synchronization cooperation from the apply worker.
397 : *
398 : * Walk over all subscription tables that are individually tracked by the
399 : * apply process (currently, all that have state other than
400 : * SUBREL_STATE_READY) and manage synchronization for them.
401 : *
402 : * If there are tables that need synchronizing and are not being synchronized
403 : * yet, start sync workers for them (if there are free slots for sync
404 : * workers). To prevent starting the sync worker for the same relation at a
405 : * high frequency after a failure, we store its last start time with each sync
406 : * state info. We start the sync worker for the same relation after waiting
407 : * at least wal_retrieve_retry_interval.
408 : *
409 : * For tables that are being synchronized already, check if sync workers
410 : * either need action from the apply worker or have finished. This is the
411 : * SYNCWAIT to CATCHUP transition.
412 : *
413 : * If the synchronization position is reached (SYNCDONE), then the table can
414 : * be marked as READY and is no longer tracked.
415 : */
416 : static void
417 8414 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
418 : {
419 : struct tablesync_start_time_mapping
420 : {
421 : Oid relid;
422 : TimestampTz last_start_time;
423 : };
424 : static HTAB *last_start_times = NULL;
425 : ListCell *lc;
426 8414 : bool started_tx = false;
427 8414 : bool should_exit = false;
428 :
429 : Assert(!IsTransactionState());
430 :
431 : /* We need up-to-date sync state info for subscription tables here. */
432 8414 : FetchTableStates(&started_tx);
433 :
434 : /*
435 : * Prepare a hash table for tracking last start times of workers, to avoid
436 : * immediate restarts. We don't need it if there are no tables that need
437 : * syncing.
438 : */
439 8414 : if (table_states_not_ready != NIL && !last_start_times)
440 244 : {
441 : HASHCTL ctl;
442 :
443 244 : ctl.keysize = sizeof(Oid);
444 244 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
445 244 : last_start_times = hash_create("Logical replication table sync worker start times",
446 : 256, &ctl, HASH_ELEM | HASH_BLOBS);
447 : }
448 :
449 : /*
450 : * Clean up the hash table when we're done with all tables (just to
451 : * release the bit of memory).
452 : */
453 8170 : else if (table_states_not_ready == NIL && last_start_times)
454 : {
455 182 : hash_destroy(last_start_times);
456 182 : last_start_times = NULL;
457 : }
458 :
459 : /*
460 : * Process all tables that are being synchronized.
461 : */
462 11868 : foreach(lc, table_states_not_ready)
463 : {
464 3454 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
465 :
466 3454 : if (rstate->state == SUBREL_STATE_SYNCDONE)
467 : {
468 : /*
469 : * Apply has caught up to the position where the table sync has
470 : * finished. Mark the table as ready so that the apply will just
471 : * continue to replicate it normally.
472 : */
473 364 : if (current_lsn >= rstate->lsn)
474 : {
475 : char originname[NAMEDATALEN];
476 :
477 358 : rstate->state = SUBREL_STATE_READY;
478 358 : rstate->lsn = current_lsn;
479 358 : if (!started_tx)
480 : {
481 24 : StartTransactionCommand();
482 24 : started_tx = true;
483 : }
484 :
485 : /*
486 : * Remove the tablesync origin tracking if exists.
487 : *
488 : * There is a chance that the user is concurrently performing
489 : * refresh for the subscription where we remove the table
490 : * state and its origin or the tablesync worker would have
491 : * already removed this origin. We can't rely on tablesync
492 : * worker to remove the origin tracking as if there is any
493 : * error while dropping we won't restart it to drop the
494 : * origin. So passing missing_ok = true.
495 : */
496 358 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
497 : rstate->relid,
498 : originname,
499 : sizeof(originname));
500 358 : replorigin_drop_by_name(originname, true, false);
501 :
502 : /*
503 : * Update the state to READY only after the origin cleanup.
504 : */
505 358 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
506 358 : rstate->relid, rstate->state,
507 : rstate->lsn);
508 : }
509 : }
510 : else
511 : {
512 : LogicalRepWorker *syncworker;
513 :
514 : /*
515 : * Look for a sync worker for this relation.
516 : */
517 3090 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
518 :
519 3090 : syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
520 : rstate->relid, false);
521 :
522 3090 : if (syncworker)
523 : {
524 : /* Found one, update our copy of its state */
525 1390 : SpinLockAcquire(&syncworker->relmutex);
526 1390 : rstate->state = syncworker->relstate;
527 1390 : rstate->lsn = syncworker->relstate_lsn;
528 1390 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
529 : {
530 : /*
531 : * Sync worker is waiting for apply. Tell sync worker it
532 : * can catchup now.
533 : */
534 362 : syncworker->relstate = SUBREL_STATE_CATCHUP;
535 362 : syncworker->relstate_lsn =
536 362 : Max(syncworker->relstate_lsn, current_lsn);
537 : }
538 1390 : SpinLockRelease(&syncworker->relmutex);
539 :
540 : /* If we told worker to catch up, wait for it. */
541 1390 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
542 : {
543 : /* Signal the sync worker, as it may be waiting for us. */
544 362 : if (syncworker->proc)
545 362 : logicalrep_worker_wakeup_ptr(syncworker);
546 :
547 : /* Now safe to release the LWLock */
548 362 : LWLockRelease(LogicalRepWorkerLock);
549 :
550 362 : if (started_tx)
551 : {
552 : /*
553 : * We must commit the existing transaction to release
554 : * the existing locks before entering a busy loop.
555 : * This is required to avoid any undetected deadlocks
556 : * due to any existing lock as deadlock detector won't
557 : * be able to detect the waits on the latch.
558 : */
559 362 : CommitTransactionCommand();
560 362 : pgstat_report_stat(false);
561 : }
562 :
563 : /*
564 : * Enter busy loop and wait for synchronization worker to
565 : * reach expected state (or die trying).
566 : */
567 362 : StartTransactionCommand();
568 362 : started_tx = true;
569 :
570 362 : wait_for_relation_state_change(rstate->relid,
571 : SUBREL_STATE_SYNCDONE);
572 : }
573 : else
574 1028 : LWLockRelease(LogicalRepWorkerLock);
575 : }
576 : else
577 : {
578 : /*
579 : * If there is no sync worker for this table yet, count
580 : * running sync workers for this subscription, while we have
581 : * the lock.
582 : */
583 : int nsyncworkers =
584 1700 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
585 :
586 : /* Now safe to release the LWLock */
587 1700 : LWLockRelease(LogicalRepWorkerLock);
588 :
589 : /*
590 : * If there are free sync worker slot(s), start a new sync
591 : * worker for the table.
592 : */
593 1700 : if (nsyncworkers < max_sync_workers_per_subscription)
594 : {
595 516 : TimestampTz now = GetCurrentTimestamp();
596 : struct tablesync_start_time_mapping *hentry;
597 : bool found;
598 :
599 516 : hentry = hash_search(last_start_times, &rstate->relid,
600 : HASH_ENTER, &found);
601 :
602 654 : if (!found ||
603 138 : TimestampDifferenceExceeds(hentry->last_start_time, now,
604 : wal_retrieve_retry_interval))
605 : {
606 : /*
607 : * Set the last_start_time even if we fail to start
608 : * the worker, so that we won't retry until
609 : * wal_retrieve_retry_interval has elapsed.
610 : */
611 402 : hentry->last_start_time = now;
612 402 : (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
613 402 : MyLogicalRepWorker->dbid,
614 402 : MySubscription->oid,
615 402 : MySubscription->name,
616 402 : MyLogicalRepWorker->userid,
617 : rstate->relid,
618 : DSM_HANDLE_INVALID,
619 : false);
620 : }
621 : }
622 : }
623 : }
624 : }
625 :
626 8414 : if (started_tx)
627 : {
628 : /*
629 : * Even when the two_phase mode is requested by the user, it remains
630 : * as 'pending' until all tablesyncs have reached READY state.
631 : *
632 : * When this happens, we restart the apply worker and (if the
633 : * conditions are still ok) then the two_phase tri-state will become
634 : * 'enabled' at that time.
635 : *
636 : * Note: If the subscription has no tables then leave the state as
637 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
638 : * work.
639 : */
640 1738 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
641 : {
642 50 : CommandCounterIncrement(); /* make updates visible */
643 50 : if (AllTablesyncsReady())
644 : {
645 12 : ereport(LOG,
646 : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
647 : MySubscription->name)));
648 12 : should_exit = true;
649 : }
650 : }
651 :
652 1738 : CommitTransactionCommand();
653 1738 : pgstat_report_stat(true);
654 : }
655 :
656 8414 : if (should_exit)
657 : {
658 : /*
659 : * Reset the last-start time for this worker so that the launcher will
660 : * restart it without waiting for wal_retrieve_retry_interval.
661 : */
662 12 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
663 :
664 12 : proc_exit(0);
665 : }
666 8402 : }
667 :
668 : /*
669 : * Process possible state change(s) of tables that are being synchronized.
670 : */
671 : void
672 8884 : process_syncing_tables(XLogRecPtr current_lsn)
673 : {
674 8884 : switch (MyLogicalRepWorker->type)
675 : {
676 44 : case WORKERTYPE_PARALLEL_APPLY:
677 :
678 : /*
679 : * Skip for parallel apply workers because they only operate on
680 : * tables that are in a READY state. See pa_can_start() and
681 : * should_apply_changes_for_rel().
682 : */
683 44 : break;
684 :
685 426 : case WORKERTYPE_TABLESYNC:
686 426 : process_syncing_tables_for_sync(current_lsn);
687 60 : break;
688 :
689 8414 : case WORKERTYPE_APPLY:
690 8414 : process_syncing_tables_for_apply(current_lsn);
691 8402 : break;
692 :
693 0 : case WORKERTYPE_UNKNOWN:
694 : /* Should never happen. */
695 0 : elog(ERROR, "Unknown worker type");
696 : }
697 8506 : }
698 :
699 : /*
700 : * Create list of columns for COPY based on logical relation mapping.
701 : */
702 : static List *
703 384 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
704 : {
705 384 : List *attnamelist = NIL;
706 : int i;
707 :
708 1030 : for (i = 0; i < rel->remoterel.natts; i++)
709 : {
710 646 : attnamelist = lappend(attnamelist,
711 646 : makeString(rel->remoterel.attnames[i]));
712 : }
713 :
714 :
715 384 : return attnamelist;
716 : }
717 :
718 : /*
719 : * Data source callback for the COPY FROM, which reads from the remote
720 : * connection and passes the data back to our local COPY.
721 : */
722 : static int
723 21966 : copy_read_data(void *outbuf, int minread, int maxread)
724 : {
725 21966 : int bytesread = 0;
726 : int avail;
727 :
728 : /* If there are some leftover data from previous read, use it. */
729 21966 : avail = copybuf->len - copybuf->cursor;
730 21966 : if (avail)
731 : {
732 0 : if (avail > maxread)
733 0 : avail = maxread;
734 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
735 0 : copybuf->cursor += avail;
736 0 : maxread -= avail;
737 0 : bytesread += avail;
738 : }
739 :
740 21986 : while (maxread > 0 && bytesread < minread)
741 : {
742 21986 : pgsocket fd = PGINVALID_SOCKET;
743 : int len;
744 21986 : char *buf = NULL;
745 :
746 : for (;;)
747 : {
748 : /* Try read the data. */
749 21986 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
750 :
751 21986 : CHECK_FOR_INTERRUPTS();
752 :
753 21986 : if (len == 0)
754 20 : break;
755 21966 : else if (len < 0)
756 21966 : return bytesread;
757 : else
758 : {
759 : /* Process the data */
760 21586 : copybuf->data = buf;
761 21586 : copybuf->len = len;
762 21586 : copybuf->cursor = 0;
763 :
764 21586 : avail = copybuf->len - copybuf->cursor;
765 21586 : if (avail > maxread)
766 0 : avail = maxread;
767 21586 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
768 21586 : outbuf = (char *) outbuf + avail;
769 21586 : copybuf->cursor += avail;
770 21586 : maxread -= avail;
771 21586 : bytesread += avail;
772 : }
773 :
774 21586 : if (maxread <= 0 || bytesread >= minread)
775 21586 : return bytesread;
776 : }
777 :
778 : /*
779 : * Wait for more data or latch.
780 : */
781 20 : (void) WaitLatchOrSocket(MyLatch,
782 : WL_SOCKET_READABLE | WL_LATCH_SET |
783 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
784 : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
785 :
786 20 : ResetLatch(MyLatch);
787 : }
788 :
789 0 : return bytesread;
790 : }
791 :
792 :
793 : /*
794 : * Get information about remote relation in similar fashion the RELATION
795 : * message provides during replication.
796 : *
797 : * This function also returns (a) the relation qualifications to be used in
798 : * the COPY command, and (b) whether the remote relation has published any
799 : * generated column.
800 : */
801 : static void
802 388 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
803 : List **qual, bool *gencol_published)
804 : {
805 : WalRcvExecResult *res;
806 : StringInfoData cmd;
807 : TupleTableSlot *slot;
808 388 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
809 388 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
810 388 : Oid qualRow[] = {TEXTOID};
811 : bool isnull;
812 : int natt;
813 388 : StringInfo pub_names = NULL;
814 388 : Bitmapset *included_cols = NULL;
815 388 : int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
816 :
817 388 : lrel->nspname = nspname;
818 388 : lrel->relname = relname;
819 :
820 : /* First fetch Oid and replica identity. */
821 388 : initStringInfo(&cmd);
822 388 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
823 : " FROM pg_catalog.pg_class c"
824 : " INNER JOIN pg_catalog.pg_namespace n"
825 : " ON (c.relnamespace = n.oid)"
826 : " WHERE n.nspname = %s"
827 : " AND c.relname = %s",
828 : quote_literal_cstr(nspname),
829 : quote_literal_cstr(relname));
830 388 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
831 : lengthof(tableRow), tableRow);
832 :
833 388 : if (res->status != WALRCV_OK_TUPLES)
834 0 : ereport(ERROR,
835 : (errcode(ERRCODE_CONNECTION_FAILURE),
836 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
837 : nspname, relname, res->err)));
838 :
839 388 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
840 388 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
841 0 : ereport(ERROR,
842 : (errcode(ERRCODE_UNDEFINED_OBJECT),
843 : errmsg("table \"%s.%s\" not found on publisher",
844 : nspname, relname)));
845 :
846 388 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
847 : Assert(!isnull);
848 388 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
849 : Assert(!isnull);
850 388 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
851 : Assert(!isnull);
852 :
853 388 : ExecDropSingleTupleTableSlot(slot);
854 388 : walrcv_clear_result(res);
855 :
856 :
857 : /*
858 : * Get column lists for each relation.
859 : *
860 : * We need to do this before fetching info about column names and types,
861 : * so that we can skip columns that should not be replicated.
862 : */
863 388 : if (server_version >= 150000)
864 : {
865 : WalRcvExecResult *pubres;
866 : TupleTableSlot *tslot;
867 388 : Oid attrsRow[] = {INT2VECTOROID};
868 :
869 : /* Build the pub_names comma-separated string. */
870 388 : pub_names = makeStringInfo();
871 388 : GetPublicationsStr(MySubscription->publications, pub_names, true);
872 :
873 : /*
874 : * Fetch info about column lists for the relation (from all the
875 : * publications).
876 : */
877 388 : resetStringInfo(&cmd);
878 388 : appendStringInfo(&cmd,
879 : "SELECT DISTINCT"
880 : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
881 : " THEN NULL ELSE gpt.attrs END)"
882 : " FROM pg_publication p,"
883 : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
884 : " pg_class c"
885 : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
886 : " AND p.pubname IN ( %s )",
887 : lrel->remoteid,
888 : pub_names->data);
889 :
890 388 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
891 : lengthof(attrsRow), attrsRow);
892 :
893 388 : if (pubres->status != WALRCV_OK_TUPLES)
894 0 : ereport(ERROR,
895 : (errcode(ERRCODE_CONNECTION_FAILURE),
896 : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
897 : nspname, relname, pubres->err)));
898 :
899 : /*
900 : * We don't support the case where the column list is different for
901 : * the same table when combining publications. See comments atop
902 : * fetch_table_list. So there should be only one row returned.
903 : * Although we already checked this when creating the subscription, we
904 : * still need to check here in case the column list was changed after
905 : * creating the subscription and before the sync worker is started.
906 : */
907 388 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
908 0 : ereport(ERROR,
909 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
910 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
911 : nspname, relname));
912 :
913 : /*
914 : * Get the column list and build a single bitmap with the attnums.
915 : *
916 : * If we find a NULL value, it means all the columns should be
917 : * replicated.
918 : */
919 388 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
920 388 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
921 : {
922 388 : Datum cfval = slot_getattr(tslot, 1, &isnull);
923 :
924 388 : if (!isnull)
925 : {
926 : ArrayType *arr;
927 : int nelems;
928 : int16 *elems;
929 :
930 44 : arr = DatumGetArrayTypeP(cfval);
931 44 : nelems = ARR_DIMS(arr)[0];
932 44 : elems = (int16 *) ARR_DATA_PTR(arr);
933 :
934 118 : for (natt = 0; natt < nelems; natt++)
935 74 : included_cols = bms_add_member(included_cols, elems[natt]);
936 : }
937 :
938 388 : ExecClearTuple(tslot);
939 : }
940 388 : ExecDropSingleTupleTableSlot(tslot);
941 :
942 388 : walrcv_clear_result(pubres);
943 : }
944 :
945 : /*
946 : * Now fetch column names and types.
947 : */
948 388 : resetStringInfo(&cmd);
949 388 : appendStringInfoString(&cmd,
950 : "SELECT a.attnum,"
951 : " a.attname,"
952 : " a.atttypid,"
953 : " a.attnum = ANY(i.indkey)");
954 :
955 : /* Generated columns can be replicated since version 18. */
956 388 : if (server_version >= 180000)
957 388 : appendStringInfoString(&cmd, ", a.attgenerated != ''");
958 :
959 776 : appendStringInfo(&cmd,
960 : " FROM pg_catalog.pg_attribute a"
961 : " LEFT JOIN pg_catalog.pg_index i"
962 : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
963 : " WHERE a.attnum > 0::pg_catalog.int2"
964 : " AND NOT a.attisdropped %s"
965 : " AND a.attrelid = %u"
966 : " ORDER BY a.attnum",
967 : lrel->remoteid,
968 388 : (server_version >= 120000 && server_version < 180000 ?
969 : "AND a.attgenerated = ''" : ""),
970 : lrel->remoteid);
971 388 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
972 : server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
973 :
974 388 : if (res->status != WALRCV_OK_TUPLES)
975 0 : ereport(ERROR,
976 : (errcode(ERRCODE_CONNECTION_FAILURE),
977 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
978 : nspname, relname, res->err)));
979 :
980 : /* We don't know the number of rows coming, so allocate enough space. */
981 388 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
982 388 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
983 388 : lrel->attkeys = NULL;
984 :
985 : /*
986 : * Store the columns as a list of names. Ignore those that are not
987 : * present in the column list, if there is one.
988 : */
989 388 : natt = 0;
990 388 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
991 1108 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
992 : {
993 : char *rel_colname;
994 : AttrNumber attnum;
995 :
996 720 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
997 : Assert(!isnull);
998 :
999 : /* If the column is not in the column list, skip it. */
1000 720 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
1001 : {
1002 62 : ExecClearTuple(slot);
1003 62 : continue;
1004 : }
1005 :
1006 658 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1007 : Assert(!isnull);
1008 :
1009 658 : lrel->attnames[natt] = rel_colname;
1010 658 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
1011 : Assert(!isnull);
1012 :
1013 658 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
1014 214 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1015 :
1016 : /* Remember if the remote table has published any generated column. */
1017 658 : if (server_version >= 180000 && !(*gencol_published))
1018 : {
1019 658 : *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
1020 : Assert(!isnull);
1021 : }
1022 :
1023 : /* Should never happen. */
1024 658 : if (++natt >= MaxTupleAttributeNumber)
1025 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
1026 : nspname, relname);
1027 :
1028 658 : ExecClearTuple(slot);
1029 : }
1030 388 : ExecDropSingleTupleTableSlot(slot);
1031 :
1032 388 : lrel->natts = natt;
1033 :
1034 388 : walrcv_clear_result(res);
1035 :
1036 : /*
1037 : * Get relation's row filter expressions. DISTINCT avoids the same
1038 : * expression of a table in multiple publications from being included
1039 : * multiple times in the final expression.
1040 : *
1041 : * We need to copy the row even if it matches just one of the
1042 : * publications, so we later combine all the quals with OR.
1043 : *
1044 : * For initial synchronization, row filtering can be ignored in following
1045 : * cases:
1046 : *
1047 : * 1) one of the subscribed publications for the table hasn't specified
1048 : * any row filter
1049 : *
1050 : * 2) one of the subscribed publications has puballtables set to true
1051 : *
1052 : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1053 : * that includes this relation
1054 : */
1055 388 : if (server_version >= 150000)
1056 : {
1057 : /* Reuse the already-built pub_names. */
1058 : Assert(pub_names != NULL);
1059 :
1060 : /* Check for row filters. */
1061 388 : resetStringInfo(&cmd);
1062 388 : appendStringInfo(&cmd,
1063 : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1064 : " FROM pg_publication p,"
1065 : " LATERAL pg_get_publication_tables(p.pubname) gpt"
1066 : " WHERE gpt.relid = %u"
1067 : " AND p.pubname IN ( %s )",
1068 : lrel->remoteid,
1069 : pub_names->data);
1070 :
1071 388 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1072 :
1073 388 : if (res->status != WALRCV_OK_TUPLES)
1074 0 : ereport(ERROR,
1075 : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1076 : nspname, relname, res->err)));
1077 :
1078 : /*
1079 : * Multiple row filter expressions for the same table will be combined
1080 : * by COPY using OR. If any of the filter expressions for this table
1081 : * are null, it means the whole table will be copied. In this case it
1082 : * is not necessary to construct a unified row filter expression at
1083 : * all.
1084 : */
1085 388 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1086 418 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1087 : {
1088 396 : Datum rf = slot_getattr(slot, 1, &isnull);
1089 :
1090 396 : if (!isnull)
1091 30 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1092 : else
1093 : {
1094 : /* Ignore filters and cleanup as necessary. */
1095 366 : if (*qual)
1096 : {
1097 6 : list_free_deep(*qual);
1098 6 : *qual = NIL;
1099 : }
1100 366 : break;
1101 : }
1102 :
1103 30 : ExecClearTuple(slot);
1104 : }
1105 388 : ExecDropSingleTupleTableSlot(slot);
1106 :
1107 388 : walrcv_clear_result(res);
1108 388 : destroyStringInfo(pub_names);
1109 : }
1110 :
1111 388 : pfree(cmd.data);
1112 388 : }
1113 :
1114 : /*
1115 : * Copy existing data of a table from publisher.
1116 : *
1117 : * Caller is responsible for locking the local relation.
1118 : */
1119 : static void
1120 388 : copy_table(Relation rel)
1121 : {
1122 : LogicalRepRelMapEntry *relmapentry;
1123 : LogicalRepRelation lrel;
1124 388 : List *qual = NIL;
1125 : WalRcvExecResult *res;
1126 : StringInfoData cmd;
1127 : CopyFromState cstate;
1128 : List *attnamelist;
1129 : ParseState *pstate;
1130 388 : List *options = NIL;
1131 388 : bool gencol_published = false;
1132 :
1133 : /* Get the publisher relation info. */
1134 388 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
1135 388 : RelationGetRelationName(rel), &lrel, &qual,
1136 : &gencol_published);
1137 :
1138 : /* Put the relation into relmap. */
1139 388 : logicalrep_relmap_update(&lrel);
1140 :
1141 : /* Map the publisher relation to local one. */
1142 388 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1143 : Assert(rel == relmapentry->localrel);
1144 :
1145 : /* Start copy on the publisher. */
1146 384 : initStringInfo(&cmd);
1147 :
1148 : /* Regular table with no row filter or generated columns */
1149 384 : if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
1150 : {
1151 326 : appendStringInfo(&cmd, "COPY %s",
1152 326 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1153 :
1154 : /* If the table has columns, then specify the columns */
1155 326 : if (lrel.natts)
1156 : {
1157 324 : appendStringInfoString(&cmd, " (");
1158 :
1159 : /*
1160 : * XXX Do we need to list the columns in all cases? Maybe we're
1161 : * replicating all columns?
1162 : */
1163 884 : for (int i = 0; i < lrel.natts; i++)
1164 : {
1165 560 : if (i > 0)
1166 236 : appendStringInfoString(&cmd, ", ");
1167 :
1168 560 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1169 : }
1170 :
1171 324 : appendStringInfoChar(&cmd, ')');
1172 : }
1173 :
1174 326 : appendStringInfoString(&cmd, " TO STDOUT");
1175 : }
1176 : else
1177 : {
1178 : /*
1179 : * For non-tables and tables with row filters, we need to do COPY
1180 : * (SELECT ...), but we can't just do SELECT * because we may need to
1181 : * copy only subset of columns including generated columns. For tables
1182 : * with any row filters, build a SELECT query with OR'ed row filters
1183 : * for COPY.
1184 : *
1185 : * We also need to use this same COPY (SELECT ...) syntax when
1186 : * generated columns are published, because copy of generated columns
1187 : * is not supported by the normal COPY.
1188 : */
1189 58 : appendStringInfoString(&cmd, "COPY (SELECT ");
1190 144 : for (int i = 0; i < lrel.natts; i++)
1191 : {
1192 86 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1193 86 : if (i < lrel.natts - 1)
1194 28 : appendStringInfoString(&cmd, ", ");
1195 : }
1196 :
1197 58 : appendStringInfoString(&cmd, " FROM ");
1198 :
1199 : /*
1200 : * For regular tables, make sure we don't copy data from a child that
1201 : * inherits the named table as those will be copied separately.
1202 : */
1203 58 : if (lrel.relkind == RELKIND_RELATION)
1204 22 : appendStringInfoString(&cmd, "ONLY ");
1205 :
1206 58 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1207 : /* list of OR'ed filters */
1208 58 : if (qual != NIL)
1209 : {
1210 : ListCell *lc;
1211 22 : char *q = strVal(linitial(qual));
1212 :
1213 22 : appendStringInfo(&cmd, " WHERE %s", q);
1214 24 : for_each_from(lc, qual, 1)
1215 : {
1216 2 : q = strVal(lfirst(lc));
1217 2 : appendStringInfo(&cmd, " OR %s", q);
1218 : }
1219 22 : list_free_deep(qual);
1220 : }
1221 :
1222 58 : appendStringInfoString(&cmd, ") TO STDOUT");
1223 : }
1224 :
1225 : /*
1226 : * Prior to v16, initial table synchronization will use text format even
1227 : * if the binary option is enabled for a subscription.
1228 : */
1229 384 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1230 384 : MySubscription->binary)
1231 : {
1232 10 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1233 10 : options = list_make1(makeDefElem("format",
1234 : (Node *) makeString("binary"), -1));
1235 : }
1236 :
1237 384 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1238 384 : pfree(cmd.data);
1239 384 : if (res->status != WALRCV_OK_COPY_OUT)
1240 0 : ereport(ERROR,
1241 : (errcode(ERRCODE_CONNECTION_FAILURE),
1242 : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1243 : lrel.nspname, lrel.relname, res->err)));
1244 384 : walrcv_clear_result(res);
1245 :
1246 384 : copybuf = makeStringInfo();
1247 :
1248 384 : pstate = make_parsestate(NULL);
1249 384 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1250 : NULL, false, false);
1251 :
1252 384 : attnamelist = make_copy_attnamelist(relmapentry);
1253 384 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1254 :
1255 : /* Do the copy */
1256 382 : (void) CopyFrom(cstate);
1257 :
1258 368 : logicalrep_rel_close(relmapentry, NoLock);
1259 368 : }
1260 :
1261 : /*
1262 : * Determine the tablesync slot name.
1263 : *
1264 : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1265 : * on slot name length. We append system_identifier to avoid slot_name
1266 : * collision with subscriptions in other clusters. With the current scheme
1267 : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1268 : * length of slot_name will be 50.
1269 : *
1270 : * The returned slot name is stored in the supplied buffer (syncslotname) with
1271 : * the given size.
1272 : *
1273 : * Note: We don't use the subscription slot name as part of tablesync slot name
1274 : * because we are responsible for cleaning up these slots and it could become
1275 : * impossible to recalculate what name to cleanup if the subscription slot name
1276 : * had changed.
1277 : */
1278 : void
1279 768 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1280 : char *syncslotname, Size szslot)
1281 : {
1282 768 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1283 : relid, GetSystemIdentifier());
1284 768 : }
1285 :
1286 : /*
1287 : * Start syncing the table in the sync worker.
1288 : *
1289 : * If nothing needs to be done to sync the table, we exit the worker without
1290 : * any further action.
1291 : *
1292 : * The returned slot name is palloc'ed in current memory context.
1293 : */
1294 : static char *
1295 392 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1296 : {
1297 : char *slotname;
1298 : char *err;
1299 : char relstate;
1300 : XLogRecPtr relstate_lsn;
1301 : Relation rel;
1302 : AclResult aclresult;
1303 : WalRcvExecResult *res;
1304 : char originname[NAMEDATALEN];
1305 : RepOriginId originid;
1306 : UserContext ucxt;
1307 : bool must_use_password;
1308 : bool run_as_owner;
1309 :
1310 : /* Check the state of the table synchronization. */
1311 392 : StartTransactionCommand();
1312 392 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1313 392 : MyLogicalRepWorker->relid,
1314 : &relstate_lsn);
1315 392 : CommitTransactionCommand();
1316 :
1317 : /* Is the use of a password mandatory? */
1318 776 : must_use_password = MySubscription->passwordrequired &&
1319 384 : !MySubscription->ownersuperuser;
1320 :
1321 392 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1322 392 : MyLogicalRepWorker->relstate = relstate;
1323 392 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1324 392 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1325 :
1326 : /*
1327 : * If synchronization is already done or no longer necessary, exit now
1328 : * that we've updated shared memory state.
1329 : */
1330 392 : switch (relstate)
1331 : {
1332 0 : case SUBREL_STATE_SYNCDONE:
1333 : case SUBREL_STATE_READY:
1334 : case SUBREL_STATE_UNKNOWN:
1335 0 : finish_sync_worker(); /* doesn't return */
1336 : }
1337 :
1338 : /* Calculate the name of the tablesync slot. */
1339 392 : slotname = (char *) palloc(NAMEDATALEN);
1340 392 : ReplicationSlotNameForTablesync(MySubscription->oid,
1341 392 : MyLogicalRepWorker->relid,
1342 : slotname,
1343 : NAMEDATALEN);
1344 :
1345 : /*
1346 : * Here we use the slot name instead of the subscription name as the
1347 : * application_name, so that it is different from the leader apply worker,
1348 : * so that synchronous replication can distinguish them.
1349 : */
1350 388 : LogRepWorkerWalRcvConn =
1351 392 : walrcv_connect(MySubscription->conninfo, true, true,
1352 : must_use_password,
1353 : slotname, &err);
1354 388 : if (LogRepWorkerWalRcvConn == NULL)
1355 0 : ereport(ERROR,
1356 : (errcode(ERRCODE_CONNECTION_FAILURE),
1357 : errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1358 : MySubscription->name, err)));
1359 :
1360 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1361 : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1362 : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1363 :
1364 : /* Assign the origin tracking record name. */
1365 388 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1366 388 : MyLogicalRepWorker->relid,
1367 : originname,
1368 : sizeof(originname));
1369 :
1370 388 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1371 : {
1372 : /*
1373 : * We have previously errored out before finishing the copy so the
1374 : * replication slot might exist. We want to remove the slot if it
1375 : * already exists and proceed.
1376 : *
1377 : * XXX We could also instead try to drop the slot, last time we failed
1378 : * but for that, we might need to clean up the copy state as it might
1379 : * be in the middle of fetching the rows. Also, if there is a network
1380 : * breakdown then it wouldn't have succeeded so trying it next time
1381 : * seems like a better bet.
1382 : */
1383 12 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1384 : }
1385 376 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1386 : {
1387 : /*
1388 : * The COPY phase was previously done, but tablesync then crashed
1389 : * before it was able to finish normally.
1390 : */
1391 0 : StartTransactionCommand();
1392 :
1393 : /*
1394 : * The origin tracking name must already exist. It was created first
1395 : * time this tablesync was launched.
1396 : */
1397 0 : originid = replorigin_by_name(originname, false);
1398 0 : replorigin_session_setup(originid, 0);
1399 0 : replorigin_session_origin = originid;
1400 0 : *origin_startpos = replorigin_session_get_progress(false);
1401 :
1402 0 : CommitTransactionCommand();
1403 :
1404 0 : goto copy_table_done;
1405 : }
1406 :
1407 388 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1408 388 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1409 388 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1410 388 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1411 :
1412 : /* Update the state and make it visible to others. */
1413 388 : StartTransactionCommand();
1414 388 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1415 388 : MyLogicalRepWorker->relid,
1416 388 : MyLogicalRepWorker->relstate,
1417 388 : MyLogicalRepWorker->relstate_lsn);
1418 388 : CommitTransactionCommand();
1419 388 : pgstat_report_stat(true);
1420 :
1421 388 : StartTransactionCommand();
1422 :
1423 : /*
1424 : * Use a standard write lock here. It might be better to disallow access
1425 : * to the table while it's being synchronized. But we don't want to block
1426 : * the main apply process from working and it has to open the relation in
1427 : * RowExclusiveLock when remapping remote relation id to local one.
1428 : */
1429 388 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1430 :
1431 : /*
1432 : * Start a transaction in the remote node in REPEATABLE READ mode. This
1433 : * ensures that both the replication slot we create (see below) and the
1434 : * COPY are consistent with each other.
1435 : */
1436 388 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1437 : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1438 : 0, NULL);
1439 388 : if (res->status != WALRCV_OK_COMMAND)
1440 0 : ereport(ERROR,
1441 : (errcode(ERRCODE_CONNECTION_FAILURE),
1442 : errmsg("table copy could not start transaction on publisher: %s",
1443 : res->err)));
1444 388 : walrcv_clear_result(res);
1445 :
1446 : /*
1447 : * Create a new permanent logical decoding slot. This slot will be used
1448 : * for the catchup phase after COPY is done, so tell it to use the
1449 : * snapshot to make the final data consistent.
1450 : */
1451 388 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1452 : slotname, false /* permanent */ , false /* two_phase */ ,
1453 : MySubscription->failover,
1454 : CRS_USE_SNAPSHOT, origin_startpos);
1455 :
1456 : /*
1457 : * Setup replication origin tracking. The purpose of doing this before the
1458 : * copy is to avoid doing the copy again due to any error in setting up
1459 : * origin tracking.
1460 : */
1461 388 : originid = replorigin_by_name(originname, true);
1462 388 : if (!OidIsValid(originid))
1463 : {
1464 : /*
1465 : * Origin tracking does not exist, so create it now.
1466 : *
1467 : * Then advance to the LSN got from walrcv_create_slot. This is WAL
1468 : * logged for the purpose of recovery. Locks are to prevent the
1469 : * replication origin from vanishing while advancing.
1470 : */
1471 388 : originid = replorigin_create(originname);
1472 :
1473 388 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1474 388 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1475 : true /* go backward */ , true /* WAL log */ );
1476 388 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1477 :
1478 388 : replorigin_session_setup(originid, 0);
1479 388 : replorigin_session_origin = originid;
1480 : }
1481 : else
1482 : {
1483 0 : ereport(ERROR,
1484 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1485 : errmsg("replication origin \"%s\" already exists",
1486 : originname)));
1487 : }
1488 :
1489 : /*
1490 : * Make sure that the copy command runs as the table owner, unless the
1491 : * user has opted out of that behaviour.
1492 : */
1493 388 : run_as_owner = MySubscription->runasowner;
1494 388 : if (!run_as_owner)
1495 386 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1496 :
1497 : /*
1498 : * Check that our table sync worker has permission to insert into the
1499 : * target table.
1500 : */
1501 388 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1502 : ACL_INSERT);
1503 388 : if (aclresult != ACLCHECK_OK)
1504 0 : aclcheck_error(aclresult,
1505 0 : get_relkind_objtype(rel->rd_rel->relkind),
1506 0 : RelationGetRelationName(rel));
1507 :
1508 : /*
1509 : * COPY FROM does not honor RLS policies. That is not a problem for
1510 : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1511 : * who has it implicitly), but other roles should not be able to
1512 : * circumvent RLS. Disallow logical replication into RLS enabled
1513 : * relations for such roles.
1514 : */
1515 388 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
1516 0 : ereport(ERROR,
1517 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1518 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1519 : GetUserNameFromId(GetUserId(), true),
1520 : RelationGetRelationName(rel))));
1521 :
1522 : /* Now do the initial data copy */
1523 388 : PushActiveSnapshot(GetTransactionSnapshot());
1524 388 : copy_table(rel);
1525 368 : PopActiveSnapshot();
1526 :
1527 368 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1528 366 : if (res->status != WALRCV_OK_COMMAND)
1529 0 : ereport(ERROR,
1530 : (errcode(ERRCODE_CONNECTION_FAILURE),
1531 : errmsg("table copy could not finish transaction on publisher: %s",
1532 : res->err)));
1533 366 : walrcv_clear_result(res);
1534 :
1535 366 : if (!run_as_owner)
1536 364 : RestoreUserContext(&ucxt);
1537 :
1538 366 : table_close(rel, NoLock);
1539 :
1540 : /* Make the copy visible. */
1541 366 : CommandCounterIncrement();
1542 :
1543 : /*
1544 : * Update the persisted state to indicate the COPY phase is done; make it
1545 : * visible to others.
1546 : */
1547 366 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1548 366 : MyLogicalRepWorker->relid,
1549 : SUBREL_STATE_FINISHEDCOPY,
1550 366 : MyLogicalRepWorker->relstate_lsn);
1551 :
1552 366 : CommitTransactionCommand();
1553 :
1554 366 : copy_table_done:
1555 :
1556 366 : elog(DEBUG1,
1557 : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1558 : originname, LSN_FORMAT_ARGS(*origin_startpos));
1559 :
1560 : /*
1561 : * We are done with the initial data synchronization, update the state.
1562 : */
1563 366 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1564 366 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1565 366 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1566 366 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1567 :
1568 : /*
1569 : * Finally, wait until the leader apply worker tells us to catch up and
1570 : * then return to let LogicalRepApplyLoop do it.
1571 : */
1572 366 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1573 366 : return slotname;
1574 : }
1575 :
1576 : /*
1577 : * Common code to fetch the up-to-date sync state info into the static lists.
1578 : *
1579 : * Returns true if subscription has 1 or more tables, else false.
1580 : *
1581 : * Note: If this function started the transaction (indicated by the parameter)
1582 : * then it is the caller's responsibility to commit it.
1583 : */
1584 : static bool
1585 8648 : FetchTableStates(bool *started_tx)
1586 : {
1587 : static bool has_subrels = false;
1588 :
1589 8648 : *started_tx = false;
1590 :
1591 8648 : if (table_states_validity != SYNC_TABLE_STATE_VALID)
1592 : {
1593 : MemoryContext oldctx;
1594 : List *rstates;
1595 : ListCell *lc;
1596 : SubscriptionRelState *rstate;
1597 :
1598 1776 : table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
1599 :
1600 : /* Clean the old lists. */
1601 1776 : list_free_deep(table_states_not_ready);
1602 1776 : table_states_not_ready = NIL;
1603 :
1604 1776 : if (!IsTransactionState())
1605 : {
1606 1744 : StartTransactionCommand();
1607 1744 : *started_tx = true;
1608 : }
1609 :
1610 : /* Fetch all non-ready tables. */
1611 1776 : rstates = GetSubscriptionRelations(MySubscription->oid, true);
1612 :
1613 : /* Allocate the tracking info in a permanent memory context. */
1614 1776 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1615 4778 : foreach(lc, rstates)
1616 : {
1617 3002 : rstate = palloc(sizeof(SubscriptionRelState));
1618 3002 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1619 3002 : table_states_not_ready = lappend(table_states_not_ready, rstate);
1620 : }
1621 1776 : MemoryContextSwitchTo(oldctx);
1622 :
1623 : /*
1624 : * Does the subscription have tables?
1625 : *
1626 : * If there were not-READY relations found then we know it does. But
1627 : * if table_states_not_ready was empty we still need to check again to
1628 : * see if there are 0 tables.
1629 : */
1630 2278 : has_subrels = (table_states_not_ready != NIL) ||
1631 502 : HasSubscriptionRelations(MySubscription->oid);
1632 :
1633 : /*
1634 : * If the subscription relation cache has been invalidated since we
1635 : * entered this routine, we still use and return the relations we just
1636 : * finished constructing, to avoid infinite loops, but we leave the
1637 : * table states marked as stale so that we'll rebuild it again on next
1638 : * access. Otherwise, we mark the table states as valid.
1639 : */
1640 1776 : if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
1641 1774 : table_states_validity = SYNC_TABLE_STATE_VALID;
1642 : }
1643 :
1644 8648 : return has_subrels;
1645 : }
1646 :
1647 : /*
1648 : * Execute the initial sync with error handling. Disable the subscription,
1649 : * if it's required.
1650 : *
1651 : * Allocate the slot name in long-lived context on return. Note that we don't
1652 : * handle FATAL errors which are probably because of system resource error and
1653 : * are not repeatable.
1654 : */
1655 : static void
1656 392 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1657 : {
1658 392 : char *sync_slotname = NULL;
1659 :
1660 : Assert(am_tablesync_worker());
1661 :
1662 392 : PG_TRY();
1663 : {
1664 : /* Call initial sync. */
1665 392 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1666 : }
1667 20 : PG_CATCH();
1668 : {
1669 20 : if (MySubscription->disableonerr)
1670 2 : DisableSubscriptionAndExit();
1671 : else
1672 : {
1673 : /*
1674 : * Report the worker failed during table synchronization. Abort
1675 : * the current transaction so that the stats message is sent in an
1676 : * idle state.
1677 : */
1678 18 : AbortOutOfAnyTransaction();
1679 18 : pgstat_report_subscription_error(MySubscription->oid, false);
1680 :
1681 18 : PG_RE_THROW();
1682 : }
1683 : }
1684 366 : PG_END_TRY();
1685 :
1686 : /* allocate slot name in long-lived context */
1687 366 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1688 366 : pfree(sync_slotname);
1689 366 : }
1690 :
1691 : /*
1692 : * Runs the tablesync worker.
1693 : *
1694 : * It starts syncing tables. After a successful sync, sets streaming options
1695 : * and starts streaming to catchup with apply worker.
1696 : */
1697 : static void
1698 392 : run_tablesync_worker()
1699 : {
1700 : char originname[NAMEDATALEN];
1701 392 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1702 392 : char *slotname = NULL;
1703 : WalRcvStreamOptions options;
1704 :
1705 392 : start_table_sync(&origin_startpos, &slotname);
1706 :
1707 366 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1708 366 : MyLogicalRepWorker->relid,
1709 : originname,
1710 : sizeof(originname));
1711 :
1712 366 : set_apply_error_context_origin(originname);
1713 :
1714 366 : set_stream_options(&options, slotname, &origin_startpos);
1715 :
1716 366 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1717 :
1718 : /* Apply the changes till we catchup with the apply worker. */
1719 366 : start_apply(origin_startpos);
1720 0 : }
1721 :
1722 : /* Logical Replication Tablesync worker entry point */
1723 : void
1724 392 : TablesyncWorkerMain(Datum main_arg)
1725 : {
1726 392 : int worker_slot = DatumGetInt32(main_arg);
1727 :
1728 392 : SetupApplyOrSyncWorker(worker_slot);
1729 :
1730 392 : run_tablesync_worker();
1731 :
1732 0 : finish_sync_worker();
1733 : }
1734 :
1735 : /*
1736 : * If the subscription has no tables then return false.
1737 : *
1738 : * Otherwise, are all tablesyncs READY?
1739 : *
1740 : * Note: This function is not suitable to be called from outside of apply or
1741 : * tablesync workers because MySubscription needs to be already initialized.
1742 : */
1743 : bool
1744 234 : AllTablesyncsReady(void)
1745 : {
1746 234 : bool started_tx = false;
1747 234 : bool has_subrels = false;
1748 :
1749 : /* We need up-to-date sync state info for subscription tables here. */
1750 234 : has_subrels = FetchTableStates(&started_tx);
1751 :
1752 234 : if (started_tx)
1753 : {
1754 30 : CommitTransactionCommand();
1755 30 : pgstat_report_stat(true);
1756 : }
1757 :
1758 : /*
1759 : * Return false when there are no tables in subscription or not all tables
1760 : * are in ready state; true otherwise.
1761 : */
1762 234 : return has_subrels && (table_states_not_ready == NIL);
1763 : }
1764 :
1765 : /*
1766 : * Update the two_phase state of the specified subscription in pg_subscription.
1767 : */
1768 : void
1769 16 : UpdateTwoPhaseState(Oid suboid, char new_state)
1770 : {
1771 : Relation rel;
1772 : HeapTuple tup;
1773 : bool nulls[Natts_pg_subscription];
1774 : bool replaces[Natts_pg_subscription];
1775 : Datum values[Natts_pg_subscription];
1776 :
1777 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1778 : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1779 : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1780 :
1781 16 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1782 16 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1783 16 : if (!HeapTupleIsValid(tup))
1784 0 : elog(ERROR,
1785 : "cache lookup failed for subscription oid %u",
1786 : suboid);
1787 :
1788 : /* Form a new tuple. */
1789 16 : memset(values, 0, sizeof(values));
1790 16 : memset(nulls, false, sizeof(nulls));
1791 16 : memset(replaces, false, sizeof(replaces));
1792 :
1793 : /* And update/set two_phase state */
1794 16 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1795 16 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1796 :
1797 16 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1798 : values, nulls, replaces);
1799 16 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1800 :
1801 16 : heap_freetuple(tup);
1802 16 : table_close(rel, RowExclusiveLock);
1803 16 : }
|