Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * tablesync.c
3 : * PostgreSQL logical replication: initial table data synchronization
4 : *
5 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/tablesync.c
9 : *
10 : * NOTES
11 : * This file contains code for initial table data synchronization for
12 : * logical replication.
13 : *
14 : * The initial data synchronization is done separately for each table,
15 : * in a separate apply worker that only fetches the initial snapshot data
16 : * from the publisher and then synchronizes the position in the stream with
17 : * the leader apply worker.
18 : *
19 : * There are several reasons for doing the synchronization this way:
20 : * - It allows us to parallelize the initial data synchronization
21 : * which lowers the time needed for it to happen.
22 : * - The initial synchronization does not have to hold the xid and LSN
23 : * for the time it takes to copy data of all tables, causing less
24 : * bloat and lower disk consumption compared to doing the
25 : * synchronization in a single process for the whole database.
26 : * - It allows us to synchronize any tables added after the initial
27 : * synchronization has finished.
28 : *
29 : * The stream position synchronization works in multiple steps:
30 : * - Apply worker requests a tablesync worker to start, setting the new
31 : * table state to INIT.
32 : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : * copying.
34 : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : * worker specific) state to indicate when the copy phase has completed, so
36 : * if the worker crashes with this (non-memory) state then the copy will not
37 : * be re-attempted.
38 : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : * until either the table state is set to SYNCDONE or the sync worker
42 : * exits.
43 : * - After the sync worker has seen the state change to CATCHUP, it will
44 : * read the stream and apply changes (acting like an apply worker) until
45 : * it catches up to the specified stream position. Then it sets the
46 : * state to SYNCDONE. There might be zero changes applied between
47 : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : * apply worker.
49 : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : * the table until it reaches the SYNCDONE stream position, at which
51 : * point it sets state to READY and stops tracking. Again, there might
52 : * be zero changes in between.
53 : *
54 : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : *
57 : * The catalog pg_subscription_rel is used to keep information about
58 : * subscribed tables and their state. The catalog holds all states
59 : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : *
61 : * Example flows look like this:
62 : * - Apply is in front:
63 : * sync:8
64 : * -> set in catalog FINISHEDCOPY
65 : * -> set in memory SYNCWAIT
66 : * apply:10
67 : * -> set in memory CATCHUP
68 : * -> enter wait-loop
69 : * sync:10
70 : * -> set in catalog SYNCDONE
71 : * -> exit
72 : * apply:10
73 : * -> exit wait-loop
74 : * -> continue rep
75 : * apply:11
76 : * -> set in catalog READY
77 : *
78 : * - Sync is in front:
79 : * sync:10
80 : * -> set in catalog FINISHEDCOPY
81 : * -> set in memory SYNCWAIT
82 : * apply:8
83 : * -> set in memory CATCHUP
84 : * -> continue per-table filtering
85 : * sync:10
86 : * -> set in catalog SYNCDONE
87 : * -> exit
88 : * apply:10
89 : * -> set in catalog READY
90 : * -> stop per-table filtering
91 : * -> continue rep
92 : *-------------------------------------------------------------------------
93 : */
94 :
95 : #include "postgres.h"
96 :
97 : #include "access/table.h"
98 : #include "access/xact.h"
99 : #include "catalog/indexing.h"
100 : #include "catalog/pg_subscription_rel.h"
101 : #include "catalog/pg_type.h"
102 : #include "commands/copy.h"
103 : #include "miscadmin.h"
104 : #include "nodes/makefuncs.h"
105 : #include "parser/parse_relation.h"
106 : #include "pgstat.h"
107 : #include "replication/logicallauncher.h"
108 : #include "replication/logicalrelation.h"
109 : #include "replication/logicalworker.h"
110 : #include "replication/origin.h"
111 : #include "replication/slot.h"
112 : #include "replication/walreceiver.h"
113 : #include "replication/worker_internal.h"
114 : #include "storage/ipc.h"
115 : #include "storage/lmgr.h"
116 : #include "utils/acl.h"
117 : #include "utils/array.h"
118 : #include "utils/builtins.h"
119 : #include "utils/lsyscache.h"
120 : #include "utils/memutils.h"
121 : #include "utils/rls.h"
122 : #include "utils/snapmgr.h"
123 : #include "utils/syscache.h"
124 : #include "utils/usercontext.h"
125 :
126 : typedef enum
127 : {
128 : SYNC_TABLE_STATE_NEEDS_REBUILD,
129 : SYNC_TABLE_STATE_REBUILD_STARTED,
130 : SYNC_TABLE_STATE_VALID,
131 : } SyncingTablesState;
132 :
133 : static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
134 : static List *table_states_not_ready = NIL;
135 : static bool FetchTableStates(bool *started_tx);
136 :
137 : static StringInfo copybuf = NULL;
138 :
139 : /*
140 : * Exit routine for synchronization worker.
141 : */
142 : static void
143 : pg_attribute_noreturn()
144 348 : finish_sync_worker(void)
145 : {
146 : /*
147 : * Commit any outstanding transaction. This is the usual case, unless
148 : * there was nothing to do for the table.
149 : */
150 348 : if (IsTransactionState())
151 : {
152 348 : CommitTransactionCommand();
153 348 : pgstat_report_stat(true);
154 : }
155 :
156 : /* And flush all writes. */
157 348 : XLogFlush(GetXLogWriteRecPtr());
158 :
159 348 : StartTransactionCommand();
160 348 : ereport(LOG,
161 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
162 : MySubscription->name,
163 : get_rel_name(MyLogicalRepWorker->relid))));
164 348 : CommitTransactionCommand();
165 :
166 : /* Find the leader apply worker and signal it. */
167 348 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
168 :
169 : /* Stop gracefully */
170 348 : proc_exit(0);
171 : }
172 :
173 : /*
174 : * Wait until the relation sync state is set in the catalog to the expected
175 : * one; return true when it happens.
176 : *
177 : * Returns false if the table sync worker or the table itself have
178 : * disappeared, or the table state has been reset.
179 : *
180 : * Currently, this is used in the apply worker when transitioning from
181 : * CATCHUP state to SYNCDONE.
182 : */
183 : static bool
184 696 : wait_for_relation_state_change(Oid relid, char expected_state)
185 : {
186 : char state;
187 :
188 : for (;;)
189 352 : {
190 : LogicalRepWorker *worker;
191 : XLogRecPtr statelsn;
192 :
193 696 : CHECK_FOR_INTERRUPTS();
194 :
195 696 : InvalidateCatalogSnapshot();
196 696 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
197 : relid, &statelsn);
198 :
199 696 : if (state == SUBREL_STATE_UNKNOWN)
200 0 : break;
201 :
202 696 : if (state == expected_state)
203 344 : return true;
204 :
205 : /* Check if the sync worker is still running and bail if not. */
206 352 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
207 352 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
208 : false);
209 352 : LWLockRelease(LogicalRepWorkerLock);
210 352 : if (!worker)
211 0 : break;
212 :
213 352 : (void) WaitLatch(MyLatch,
214 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
215 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
216 :
217 352 : ResetLatch(MyLatch);
218 : }
219 :
220 0 : return false;
221 : }
222 :
223 : /*
224 : * Wait until the apply worker changes the state of our synchronization
225 : * worker to the expected one.
226 : *
227 : * Used when transitioning from SYNCWAIT state to CATCHUP.
228 : *
229 : * Returns false if the apply worker has disappeared.
230 : */
231 : static bool
232 698 : wait_for_worker_state_change(char expected_state)
233 : {
234 : int rc;
235 :
236 : for (;;)
237 350 : {
238 : LogicalRepWorker *worker;
239 :
240 698 : CHECK_FOR_INTERRUPTS();
241 :
242 : /*
243 : * Done if already in correct state. (We assume this fetch is atomic
244 : * enough to not give a misleading answer if we do it with no lock.)
245 : */
246 698 : if (MyLogicalRepWorker->relstate == expected_state)
247 348 : return true;
248 :
249 : /*
250 : * Bail out if the apply worker has died, else signal it we're
251 : * waiting.
252 : */
253 350 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
254 350 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
255 : InvalidOid, false);
256 350 : if (worker && worker->proc)
257 350 : logicalrep_worker_wakeup_ptr(worker);
258 350 : LWLockRelease(LogicalRepWorkerLock);
259 350 : if (!worker)
260 0 : break;
261 :
262 : /*
263 : * Wait. We expect to get a latch signal back from the apply worker,
264 : * but use a timeout in case it dies without sending one.
265 : */
266 350 : rc = WaitLatch(MyLatch,
267 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
268 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
269 :
270 350 : if (rc & WL_LATCH_SET)
271 350 : ResetLatch(MyLatch);
272 : }
273 :
274 0 : return false;
275 : }
276 :
277 : /*
278 : * Callback from syscache invalidation.
279 : */
280 : void
281 3206 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
282 : {
283 3206 : table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
284 3206 : }
285 :
286 : /*
287 : * Handle table synchronization cooperation from the synchronization
288 : * worker.
289 : *
290 : * If the sync worker is in CATCHUP state and reached (or passed) the
291 : * predetermined synchronization point in the WAL stream, mark the table as
292 : * SYNCDONE and finish.
293 : */
294 : static void
295 376 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
296 : {
297 376 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
298 :
299 376 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
300 376 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
301 : {
302 : TimeLineID tli;
303 348 : char syncslotname[NAMEDATALEN] = {0};
304 348 : char originname[NAMEDATALEN] = {0};
305 :
306 348 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
307 348 : MyLogicalRepWorker->relstate_lsn = current_lsn;
308 :
309 348 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
310 :
311 : /*
312 : * UpdateSubscriptionRelState must be called within a transaction.
313 : */
314 348 : if (!IsTransactionState())
315 348 : StartTransactionCommand();
316 :
317 348 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
318 348 : MyLogicalRepWorker->relid,
319 348 : MyLogicalRepWorker->relstate,
320 348 : MyLogicalRepWorker->relstate_lsn);
321 :
322 : /*
323 : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
324 : * the slot.
325 : */
326 348 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
327 :
328 : /*
329 : * Cleanup the tablesync slot.
330 : *
331 : * This has to be done after updating the state because otherwise if
332 : * there is an error while doing the database operations we won't be
333 : * able to rollback dropped slot.
334 : */
335 348 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
336 348 : MyLogicalRepWorker->relid,
337 : syncslotname,
338 : sizeof(syncslotname));
339 :
340 : /*
341 : * It is important to give an error if we are unable to drop the slot,
342 : * otherwise, it won't be dropped till the corresponding subscription
343 : * is dropped. So passing missing_ok = false.
344 : */
345 348 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
346 :
347 348 : CommitTransactionCommand();
348 348 : pgstat_report_stat(false);
349 :
350 : /*
351 : * Start a new transaction to clean up the tablesync origin tracking.
352 : * This transaction will be ended within the finish_sync_worker().
353 : * Now, even, if we fail to remove this here, the apply worker will
354 : * ensure to clean it up afterward.
355 : *
356 : * We need to do this after the table state is set to SYNCDONE.
357 : * Otherwise, if an error occurs while performing the database
358 : * operation, the worker will be restarted and the in-memory state of
359 : * replication progress (remote_lsn) won't be rolled-back which would
360 : * have been cleared before restart. So, the restarted worker will use
361 : * invalid replication progress state resulting in replay of
362 : * transactions that have already been applied.
363 : */
364 348 : StartTransactionCommand();
365 :
366 348 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
367 348 : MyLogicalRepWorker->relid,
368 : originname,
369 : sizeof(originname));
370 :
371 : /*
372 : * Resetting the origin session removes the ownership of the slot.
373 : * This is needed to allow the origin to be dropped.
374 : */
375 348 : replorigin_session_reset();
376 348 : replorigin_session_origin = InvalidRepOriginId;
377 348 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
378 348 : replorigin_session_origin_timestamp = 0;
379 :
380 : /*
381 : * Drop the tablesync's origin tracking if exists.
382 : *
383 : * There is a chance that the user is concurrently performing refresh
384 : * for the subscription where we remove the table state and its origin
385 : * or the apply worker would have removed this origin. So passing
386 : * missing_ok = true.
387 : */
388 348 : replorigin_drop_by_name(originname, true, false);
389 :
390 348 : finish_sync_worker();
391 : }
392 : else
393 28 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
394 28 : }
395 :
396 : /*
397 : * Handle table synchronization cooperation from the apply worker.
398 : *
399 : * Walk over all subscription tables that are individually tracked by the
400 : * apply process (currently, all that have state other than
401 : * SUBREL_STATE_READY) and manage synchronization for them.
402 : *
403 : * If there are tables that need synchronizing and are not being synchronized
404 : * yet, start sync workers for them (if there are free slots for sync
405 : * workers). To prevent starting the sync worker for the same relation at a
406 : * high frequency after a failure, we store its last start time with each sync
407 : * state info. We start the sync worker for the same relation after waiting
408 : * at least wal_retrieve_retry_interval.
409 : *
410 : * For tables that are being synchronized already, check if sync workers
411 : * either need action from the apply worker or have finished. This is the
412 : * SYNCWAIT to CATCHUP transition.
413 : *
414 : * If the synchronization position is reached (SYNCDONE), then the table can
415 : * be marked as READY and is no longer tracked.
416 : */
417 : static void
418 7470 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
419 : {
420 : struct tablesync_start_time_mapping
421 : {
422 : Oid relid;
423 : TimestampTz last_start_time;
424 : };
425 : static HTAB *last_start_times = NULL;
426 : ListCell *lc;
427 7470 : bool started_tx = false;
428 7470 : bool should_exit = false;
429 :
430 : Assert(!IsTransactionState());
431 :
432 : /* We need up-to-date sync state info for subscription tables here. */
433 7470 : FetchTableStates(&started_tx);
434 :
435 : /*
436 : * Prepare a hash table for tracking last start times of workers, to avoid
437 : * immediate restarts. We don't need it if there are no tables that need
438 : * syncing.
439 : */
440 7470 : if (table_states_not_ready != NIL && !last_start_times)
441 220 : {
442 : HASHCTL ctl;
443 :
444 220 : ctl.keysize = sizeof(Oid);
445 220 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
446 220 : last_start_times = hash_create("Logical replication table sync worker start times",
447 : 256, &ctl, HASH_ELEM | HASH_BLOBS);
448 : }
449 :
450 : /*
451 : * Clean up the hash table when we're done with all tables (just to
452 : * release the bit of memory).
453 : */
454 7250 : else if (table_states_not_ready == NIL && last_start_times)
455 : {
456 166 : hash_destroy(last_start_times);
457 166 : last_start_times = NULL;
458 : }
459 :
460 : /*
461 : * Process all tables that are being synchronized.
462 : */
463 10276 : foreach(lc, table_states_not_ready)
464 : {
465 2806 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
466 :
467 2806 : if (rstate->state == SUBREL_STATE_SYNCDONE)
468 : {
469 : /*
470 : * Apply has caught up to the position where the table sync has
471 : * finished. Mark the table as ready so that the apply will just
472 : * continue to replicate it normally.
473 : */
474 344 : if (current_lsn >= rstate->lsn)
475 : {
476 : char originname[NAMEDATALEN];
477 :
478 344 : rstate->state = SUBREL_STATE_READY;
479 344 : rstate->lsn = current_lsn;
480 344 : if (!started_tx)
481 : {
482 14 : StartTransactionCommand();
483 14 : started_tx = true;
484 : }
485 :
486 : /*
487 : * Remove the tablesync origin tracking if exists.
488 : *
489 : * There is a chance that the user is concurrently performing
490 : * refresh for the subscription where we remove the table
491 : * state and its origin or the tablesync worker would have
492 : * already removed this origin. We can't rely on tablesync
493 : * worker to remove the origin tracking as if there is any
494 : * error while dropping we won't restart it to drop the
495 : * origin. So passing missing_ok = true.
496 : */
497 344 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
498 : rstate->relid,
499 : originname,
500 : sizeof(originname));
501 344 : replorigin_drop_by_name(originname, true, false);
502 :
503 : /*
504 : * Update the state to READY only after the origin cleanup.
505 : */
506 344 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
507 344 : rstate->relid, rstate->state,
508 : rstate->lsn);
509 : }
510 : }
511 : else
512 : {
513 : LogicalRepWorker *syncworker;
514 :
515 : /*
516 : * Look for a sync worker for this relation.
517 : */
518 2462 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
519 :
520 2462 : syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
521 : rstate->relid, false);
522 :
523 2462 : if (syncworker)
524 : {
525 : /* Found one, update our copy of its state */
526 1124 : SpinLockAcquire(&syncworker->relmutex);
527 1124 : rstate->state = syncworker->relstate;
528 1124 : rstate->lsn = syncworker->relstate_lsn;
529 1124 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
530 : {
531 : /*
532 : * Sync worker is waiting for apply. Tell sync worker it
533 : * can catchup now.
534 : */
535 344 : syncworker->relstate = SUBREL_STATE_CATCHUP;
536 344 : syncworker->relstate_lsn =
537 344 : Max(syncworker->relstate_lsn, current_lsn);
538 : }
539 1124 : SpinLockRelease(&syncworker->relmutex);
540 :
541 : /* If we told worker to catch up, wait for it. */
542 1124 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
543 : {
544 : /* Signal the sync worker, as it may be waiting for us. */
545 344 : if (syncworker->proc)
546 344 : logicalrep_worker_wakeup_ptr(syncworker);
547 :
548 : /* Now safe to release the LWLock */
549 344 : LWLockRelease(LogicalRepWorkerLock);
550 :
551 344 : if (started_tx)
552 : {
553 : /*
554 : * We must commit the existing transaction to release
555 : * the existing locks before entering a busy loop.
556 : * This is required to avoid any undetected deadlocks
557 : * due to any existing lock as deadlock detector won't
558 : * be able to detect the waits on the latch.
559 : */
560 344 : CommitTransactionCommand();
561 344 : pgstat_report_stat(false);
562 : }
563 :
564 : /*
565 : * Enter busy loop and wait for synchronization worker to
566 : * reach expected state (or die trying).
567 : */
568 344 : StartTransactionCommand();
569 344 : started_tx = true;
570 :
571 344 : wait_for_relation_state_change(rstate->relid,
572 : SUBREL_STATE_SYNCDONE);
573 : }
574 : else
575 780 : LWLockRelease(LogicalRepWorkerLock);
576 : }
577 : else
578 : {
579 : /*
580 : * If there is no sync worker for this table yet, count
581 : * running sync workers for this subscription, while we have
582 : * the lock.
583 : */
584 : int nsyncworkers =
585 1338 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
586 :
587 : /* Now safe to release the LWLock */
588 1338 : LWLockRelease(LogicalRepWorkerLock);
589 :
590 : /*
591 : * If there are free sync worker slot(s), start a new sync
592 : * worker for the table.
593 : */
594 1338 : if (nsyncworkers < max_sync_workers_per_subscription)
595 : {
596 388 : TimestampTz now = GetCurrentTimestamp();
597 : struct tablesync_start_time_mapping *hentry;
598 : bool found;
599 :
600 388 : hentry = hash_search(last_start_times, &rstate->relid,
601 : HASH_ENTER, &found);
602 :
603 420 : if (!found ||
604 32 : TimestampDifferenceExceeds(hentry->last_start_time, now,
605 : wal_retrieve_retry_interval))
606 : {
607 366 : logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
608 366 : MyLogicalRepWorker->dbid,
609 366 : MySubscription->oid,
610 366 : MySubscription->name,
611 366 : MyLogicalRepWorker->userid,
612 : rstate->relid,
613 : DSM_HANDLE_INVALID);
614 366 : hentry->last_start_time = now;
615 : }
616 : }
617 : }
618 : }
619 : }
620 :
621 7470 : if (started_tx)
622 : {
623 : /*
624 : * Even when the two_phase mode is requested by the user, it remains
625 : * as 'pending' until all tablesyncs have reached READY state.
626 : *
627 : * When this happens, we restart the apply worker and (if the
628 : * conditions are still ok) then the two_phase tri-state will become
629 : * 'enabled' at that time.
630 : *
631 : * Note: If the subscription has no tables then leave the state as
632 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
633 : * work.
634 : */
635 1546 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
636 : {
637 58 : CommandCounterIncrement(); /* make updates visible */
638 58 : if (AllTablesyncsReady())
639 : {
640 12 : ereport(LOG,
641 : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
642 : MySubscription->name)));
643 12 : should_exit = true;
644 : }
645 : }
646 :
647 1546 : CommitTransactionCommand();
648 1546 : pgstat_report_stat(true);
649 : }
650 :
651 7470 : if (should_exit)
652 : {
653 : /*
654 : * Reset the last-start time for this worker so that the launcher will
655 : * restart it without waiting for wal_retrieve_retry_interval.
656 : */
657 12 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
658 :
659 12 : proc_exit(0);
660 : }
661 7458 : }
662 :
663 : /*
664 : * Process possible state change(s) of tables that are being synchronized.
665 : */
666 : void
667 7890 : process_syncing_tables(XLogRecPtr current_lsn)
668 : {
669 7890 : switch (MyLogicalRepWorker->type)
670 : {
671 44 : case WORKERTYPE_PARALLEL_APPLY:
672 :
673 : /*
674 : * Skip for parallel apply workers because they only operate on
675 : * tables that are in a READY state. See pa_can_start() and
676 : * should_apply_changes_for_rel().
677 : */
678 44 : break;
679 :
680 376 : case WORKERTYPE_TABLESYNC:
681 376 : process_syncing_tables_for_sync(current_lsn);
682 28 : break;
683 :
684 7470 : case WORKERTYPE_APPLY:
685 7470 : process_syncing_tables_for_apply(current_lsn);
686 7458 : break;
687 :
688 0 : case WORKERTYPE_UNKNOWN:
689 : /* Should never happen. */
690 0 : elog(ERROR, "Unknown worker type");
691 : }
692 7530 : }
693 :
694 : /*
695 : * Create list of columns for COPY based on logical relation mapping.
696 : */
697 : static List *
698 364 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
699 : {
700 364 : List *attnamelist = NIL;
701 : int i;
702 :
703 974 : for (i = 0; i < rel->remoterel.natts; i++)
704 : {
705 610 : attnamelist = lappend(attnamelist,
706 610 : makeString(rel->remoterel.attnames[i]));
707 : }
708 :
709 :
710 364 : return attnamelist;
711 : }
712 :
713 : /*
714 : * Data source callback for the COPY FROM, which reads from the remote
715 : * connection and passes the data back to our local COPY.
716 : */
717 : static int
718 27942 : copy_read_data(void *outbuf, int minread, int maxread)
719 : {
720 27942 : int bytesread = 0;
721 : int avail;
722 :
723 : /* If there are some leftover data from previous read, use it. */
724 27942 : avail = copybuf->len - copybuf->cursor;
725 27942 : if (avail)
726 : {
727 0 : if (avail > maxread)
728 0 : avail = maxread;
729 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
730 0 : copybuf->cursor += avail;
731 0 : maxread -= avail;
732 0 : bytesread += avail;
733 : }
734 :
735 27944 : while (maxread > 0 && bytesread < minread)
736 : {
737 27944 : pgsocket fd = PGINVALID_SOCKET;
738 : int len;
739 27944 : char *buf = NULL;
740 :
741 : for (;;)
742 : {
743 : /* Try read the data. */
744 27944 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
745 :
746 27944 : CHECK_FOR_INTERRUPTS();
747 :
748 27944 : if (len == 0)
749 2 : break;
750 27942 : else if (len < 0)
751 27942 : return bytesread;
752 : else
753 : {
754 : /* Process the data */
755 27582 : copybuf->data = buf;
756 27582 : copybuf->len = len;
757 27582 : copybuf->cursor = 0;
758 :
759 27582 : avail = copybuf->len - copybuf->cursor;
760 27582 : if (avail > maxread)
761 0 : avail = maxread;
762 27582 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
763 27582 : outbuf = (char *) outbuf + avail;
764 27582 : copybuf->cursor += avail;
765 27582 : maxread -= avail;
766 27582 : bytesread += avail;
767 : }
768 :
769 27582 : if (maxread <= 0 || bytesread >= minread)
770 27582 : return bytesread;
771 : }
772 :
773 : /*
774 : * Wait for more data or latch.
775 : */
776 2 : (void) WaitLatchOrSocket(MyLatch,
777 : WL_SOCKET_READABLE | WL_LATCH_SET |
778 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
779 : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
780 :
781 2 : ResetLatch(MyLatch);
782 : }
783 :
784 0 : return bytesread;
785 : }
786 :
787 :
788 : /*
789 : * Get information about remote relation in similar fashion the RELATION
790 : * message provides during replication.
791 : *
792 : * This function also returns (a) the relation qualifications to be used in
793 : * the COPY command, and (b) whether the remote relation has published any
794 : * generated column.
795 : */
796 : static void
797 368 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
798 : List **qual, bool *gencol_published)
799 : {
800 : WalRcvExecResult *res;
801 : StringInfoData cmd;
802 : TupleTableSlot *slot;
803 368 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
804 368 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
805 368 : Oid qualRow[] = {TEXTOID};
806 : bool isnull;
807 : int natt;
808 368 : StringInfo pub_names = NULL;
809 368 : Bitmapset *included_cols = NULL;
810 368 : int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
811 :
812 368 : lrel->nspname = nspname;
813 368 : lrel->relname = relname;
814 :
815 : /* First fetch Oid and replica identity. */
816 368 : initStringInfo(&cmd);
817 368 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
818 : " FROM pg_catalog.pg_class c"
819 : " INNER JOIN pg_catalog.pg_namespace n"
820 : " ON (c.relnamespace = n.oid)"
821 : " WHERE n.nspname = %s"
822 : " AND c.relname = %s",
823 : quote_literal_cstr(nspname),
824 : quote_literal_cstr(relname));
825 368 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
826 : lengthof(tableRow), tableRow);
827 :
828 366 : if (res->status != WALRCV_OK_TUPLES)
829 0 : ereport(ERROR,
830 : (errcode(ERRCODE_CONNECTION_FAILURE),
831 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
832 : nspname, relname, res->err)));
833 :
834 366 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
835 366 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
836 0 : ereport(ERROR,
837 : (errcode(ERRCODE_UNDEFINED_OBJECT),
838 : errmsg("table \"%s.%s\" not found on publisher",
839 : nspname, relname)));
840 :
841 366 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
842 : Assert(!isnull);
843 366 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
844 : Assert(!isnull);
845 366 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
846 : Assert(!isnull);
847 :
848 366 : ExecDropSingleTupleTableSlot(slot);
849 366 : walrcv_clear_result(res);
850 :
851 :
852 : /*
853 : * Get column lists for each relation.
854 : *
855 : * We need to do this before fetching info about column names and types,
856 : * so that we can skip columns that should not be replicated.
857 : */
858 366 : if (server_version >= 150000)
859 : {
860 : WalRcvExecResult *pubres;
861 : TupleTableSlot *tslot;
862 366 : Oid attrsRow[] = {INT2VECTOROID};
863 :
864 : /* Build the pub_names comma-separated string. */
865 366 : pub_names = makeStringInfo();
866 366 : GetPublicationsStr(MySubscription->publications, pub_names, true);
867 :
868 : /*
869 : * Fetch info about column lists for the relation (from all the
870 : * publications).
871 : */
872 366 : resetStringInfo(&cmd);
873 366 : appendStringInfo(&cmd,
874 : "SELECT DISTINCT"
875 : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
876 : " THEN NULL ELSE gpt.attrs END)"
877 : " FROM pg_publication p,"
878 : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
879 : " pg_class c"
880 : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
881 : " AND p.pubname IN ( %s )",
882 : lrel->remoteid,
883 : pub_names->data);
884 :
885 366 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
886 : lengthof(attrsRow), attrsRow);
887 :
888 366 : if (pubres->status != WALRCV_OK_TUPLES)
889 0 : ereport(ERROR,
890 : (errcode(ERRCODE_CONNECTION_FAILURE),
891 : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
892 : nspname, relname, pubres->err)));
893 :
894 : /*
895 : * We don't support the case where the column list is different for
896 : * the same table when combining publications. See comments atop
897 : * fetch_table_list. So there should be only one row returned.
898 : * Although we already checked this when creating the subscription, we
899 : * still need to check here in case the column list was changed after
900 : * creating the subscription and before the sync worker is started.
901 : */
902 366 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
903 0 : ereport(ERROR,
904 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
905 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
906 : nspname, relname));
907 :
908 : /*
909 : * Get the column list and build a single bitmap with the attnums.
910 : *
911 : * If we find a NULL value, it means all the columns should be
912 : * replicated.
913 : */
914 366 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
915 366 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
916 : {
917 366 : Datum cfval = slot_getattr(tslot, 1, &isnull);
918 :
919 366 : if (!isnull)
920 : {
921 : ArrayType *arr;
922 : int nelems;
923 : int16 *elems;
924 :
925 42 : arr = DatumGetArrayTypeP(cfval);
926 42 : nelems = ARR_DIMS(arr)[0];
927 42 : elems = (int16 *) ARR_DATA_PTR(arr);
928 :
929 112 : for (natt = 0; natt < nelems; natt++)
930 70 : included_cols = bms_add_member(included_cols, elems[natt]);
931 : }
932 :
933 366 : ExecClearTuple(tslot);
934 : }
935 366 : ExecDropSingleTupleTableSlot(tslot);
936 :
937 366 : walrcv_clear_result(pubres);
938 : }
939 :
940 : /*
941 : * Now fetch column names and types.
942 : */
943 366 : resetStringInfo(&cmd);
944 366 : appendStringInfo(&cmd,
945 : "SELECT a.attnum,"
946 : " a.attname,"
947 : " a.atttypid,"
948 : " a.attnum = ANY(i.indkey)");
949 :
950 : /* Generated columns can be replicated since version 18. */
951 366 : if (server_version >= 180000)
952 366 : appendStringInfo(&cmd, ", a.attgenerated != ''");
953 :
954 732 : appendStringInfo(&cmd,
955 : " FROM pg_catalog.pg_attribute a"
956 : " LEFT JOIN pg_catalog.pg_index i"
957 : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
958 : " WHERE a.attnum > 0::pg_catalog.int2"
959 : " AND NOT a.attisdropped %s"
960 : " AND a.attrelid = %u"
961 : " ORDER BY a.attnum",
962 : lrel->remoteid,
963 366 : (server_version >= 120000 && server_version < 180000 ?
964 : "AND a.attgenerated = ''" : ""),
965 : lrel->remoteid);
966 366 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
967 : server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
968 :
969 366 : if (res->status != WALRCV_OK_TUPLES)
970 0 : ereport(ERROR,
971 : (errcode(ERRCODE_CONNECTION_FAILURE),
972 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
973 : nspname, relname, res->err)));
974 :
975 : /* We don't know the number of rows coming, so allocate enough space. */
976 366 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
977 366 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
978 366 : lrel->attkeys = NULL;
979 :
980 : /*
981 : * Store the columns as a list of names. Ignore those that are not
982 : * present in the column list, if there is one.
983 : */
984 366 : natt = 0;
985 366 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
986 1038 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
987 : {
988 : char *rel_colname;
989 : AttrNumber attnum;
990 :
991 672 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
992 : Assert(!isnull);
993 :
994 : /* If the column is not in the column list, skip it. */
995 672 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
996 : {
997 56 : ExecClearTuple(slot);
998 56 : continue;
999 : }
1000 :
1001 616 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1002 : Assert(!isnull);
1003 :
1004 616 : lrel->attnames[natt] = rel_colname;
1005 616 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
1006 : Assert(!isnull);
1007 :
1008 616 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
1009 206 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1010 :
1011 : /* Remember if the remote table has published any generated column. */
1012 616 : if (server_version >= 180000 && !(*gencol_published))
1013 : {
1014 616 : *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
1015 : Assert(!isnull);
1016 : }
1017 :
1018 : /* Should never happen. */
1019 616 : if (++natt >= MaxTupleAttributeNumber)
1020 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
1021 : nspname, relname);
1022 :
1023 616 : ExecClearTuple(slot);
1024 : }
1025 366 : ExecDropSingleTupleTableSlot(slot);
1026 :
1027 366 : lrel->natts = natt;
1028 :
1029 366 : walrcv_clear_result(res);
1030 :
1031 : /*
1032 : * Get relation's row filter expressions. DISTINCT avoids the same
1033 : * expression of a table in multiple publications from being included
1034 : * multiple times in the final expression.
1035 : *
1036 : * We need to copy the row even if it matches just one of the
1037 : * publications, so we later combine all the quals with OR.
1038 : *
1039 : * For initial synchronization, row filtering can be ignored in following
1040 : * cases:
1041 : *
1042 : * 1) one of the subscribed publications for the table hasn't specified
1043 : * any row filter
1044 : *
1045 : * 2) one of the subscribed publications has puballtables set to true
1046 : *
1047 : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1048 : * that includes this relation
1049 : */
1050 366 : if (server_version >= 150000)
1051 : {
1052 : /* Reuse the already-built pub_names. */
1053 : Assert(pub_names != NULL);
1054 :
1055 : /* Check for row filters. */
1056 366 : resetStringInfo(&cmd);
1057 366 : appendStringInfo(&cmd,
1058 : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1059 : " FROM pg_publication p,"
1060 : " LATERAL pg_get_publication_tables(p.pubname) gpt"
1061 : " WHERE gpt.relid = %u"
1062 : " AND p.pubname IN ( %s )",
1063 : lrel->remoteid,
1064 : pub_names->data);
1065 :
1066 366 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1067 :
1068 366 : if (res->status != WALRCV_OK_TUPLES)
1069 0 : ereport(ERROR,
1070 : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1071 : nspname, relname, res->err)));
1072 :
1073 : /*
1074 : * Multiple row filter expressions for the same table will be combined
1075 : * by COPY using OR. If any of the filter expressions for this table
1076 : * are null, it means the whole table will be copied. In this case it
1077 : * is not necessary to construct a unified row filter expression at
1078 : * all.
1079 : */
1080 366 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1081 394 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1082 : {
1083 374 : Datum rf = slot_getattr(slot, 1, &isnull);
1084 :
1085 374 : if (!isnull)
1086 28 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1087 : else
1088 : {
1089 : /* Ignore filters and cleanup as necessary. */
1090 346 : if (*qual)
1091 : {
1092 6 : list_free_deep(*qual);
1093 6 : *qual = NIL;
1094 : }
1095 346 : break;
1096 : }
1097 :
1098 28 : ExecClearTuple(slot);
1099 : }
1100 366 : ExecDropSingleTupleTableSlot(slot);
1101 :
1102 366 : walrcv_clear_result(res);
1103 366 : destroyStringInfo(pub_names);
1104 : }
1105 :
1106 366 : pfree(cmd.data);
1107 366 : }
1108 :
1109 : /*
1110 : * Copy existing data of a table from publisher.
1111 : *
1112 : * Caller is responsible for locking the local relation.
1113 : */
1114 : static void
1115 368 : copy_table(Relation rel)
1116 : {
1117 : LogicalRepRelMapEntry *relmapentry;
1118 : LogicalRepRelation lrel;
1119 368 : List *qual = NIL;
1120 : WalRcvExecResult *res;
1121 : StringInfoData cmd;
1122 : CopyFromState cstate;
1123 : List *attnamelist;
1124 : ParseState *pstate;
1125 368 : List *options = NIL;
1126 368 : bool gencol_published = false;
1127 :
1128 : /* Get the publisher relation info. */
1129 368 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
1130 368 : RelationGetRelationName(rel), &lrel, &qual,
1131 : &gencol_published);
1132 :
1133 : /* Put the relation into relmap. */
1134 366 : logicalrep_relmap_update(&lrel);
1135 :
1136 : /* Map the publisher relation to local one. */
1137 366 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1138 : Assert(rel == relmapentry->localrel);
1139 :
1140 : /* Start copy on the publisher. */
1141 364 : initStringInfo(&cmd);
1142 :
1143 : /* Regular table with no row filter or generated columns */
1144 364 : if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
1145 : {
1146 310 : appendStringInfo(&cmd, "COPY %s",
1147 310 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1148 :
1149 : /* If the table has columns, then specify the columns */
1150 310 : if (lrel.natts)
1151 : {
1152 308 : appendStringInfoString(&cmd, " (");
1153 :
1154 : /*
1155 : * XXX Do we need to list the columns in all cases? Maybe we're
1156 : * replicating all columns?
1157 : */
1158 838 : for (int i = 0; i < lrel.natts; i++)
1159 : {
1160 530 : if (i > 0)
1161 222 : appendStringInfoString(&cmd, ", ");
1162 :
1163 530 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1164 : }
1165 :
1166 308 : appendStringInfoChar(&cmd, ')');
1167 : }
1168 :
1169 310 : appendStringInfoString(&cmd, " TO STDOUT");
1170 : }
1171 : else
1172 : {
1173 : /*
1174 : * For non-tables and tables with row filters, we need to do COPY
1175 : * (SELECT ...), but we can't just do SELECT * because we may need to
1176 : * copy only subset of columns including generated columns. For tables
1177 : * with any row filters, build a SELECT query with OR'ed row filters
1178 : * for COPY.
1179 : *
1180 : * We also need to use this same COPY (SELECT ...) syntax when
1181 : * generated columns are published, because copy of generated columns
1182 : * is not supported by the normal COPY.
1183 : */
1184 54 : appendStringInfoString(&cmd, "COPY (SELECT ");
1185 134 : for (int i = 0; i < lrel.natts; i++)
1186 : {
1187 80 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1188 80 : if (i < lrel.natts - 1)
1189 26 : appendStringInfoString(&cmd, ", ");
1190 : }
1191 :
1192 54 : appendStringInfoString(&cmd, " FROM ");
1193 :
1194 : /*
1195 : * For regular tables, make sure we don't copy data from a child that
1196 : * inherits the named table as those will be copied separately.
1197 : */
1198 54 : if (lrel.relkind == RELKIND_RELATION)
1199 20 : appendStringInfoString(&cmd, "ONLY ");
1200 :
1201 54 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1202 : /* list of OR'ed filters */
1203 54 : if (qual != NIL)
1204 : {
1205 : ListCell *lc;
1206 20 : char *q = strVal(linitial(qual));
1207 :
1208 20 : appendStringInfo(&cmd, " WHERE %s", q);
1209 22 : for_each_from(lc, qual, 1)
1210 : {
1211 2 : q = strVal(lfirst(lc));
1212 2 : appendStringInfo(&cmd, " OR %s", q);
1213 : }
1214 20 : list_free_deep(qual);
1215 : }
1216 :
1217 54 : appendStringInfoString(&cmd, ") TO STDOUT");
1218 : }
1219 :
1220 : /*
1221 : * Prior to v16, initial table synchronization will use text format even
1222 : * if the binary option is enabled for a subscription.
1223 : */
1224 364 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1225 364 : MySubscription->binary)
1226 : {
1227 10 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1228 10 : options = list_make1(makeDefElem("format",
1229 : (Node *) makeString("binary"), -1));
1230 : }
1231 :
1232 364 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1233 364 : pfree(cmd.data);
1234 364 : if (res->status != WALRCV_OK_COPY_OUT)
1235 0 : ereport(ERROR,
1236 : (errcode(ERRCODE_CONNECTION_FAILURE),
1237 : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1238 : lrel.nspname, lrel.relname, res->err)));
1239 364 : walrcv_clear_result(res);
1240 :
1241 364 : copybuf = makeStringInfo();
1242 :
1243 364 : pstate = make_parsestate(NULL);
1244 364 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1245 : NULL, false, false);
1246 :
1247 364 : attnamelist = make_copy_attnamelist(relmapentry);
1248 364 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1249 :
1250 : /* Do the copy */
1251 362 : (void) CopyFrom(cstate);
1252 :
1253 348 : logicalrep_rel_close(relmapentry, NoLock);
1254 348 : }
1255 :
1256 : /*
1257 : * Determine the tablesync slot name.
1258 : *
1259 : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1260 : * on slot name length. We append system_identifier to avoid slot_name
1261 : * collision with subscriptions in other clusters. With the current scheme
1262 : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1263 : * length of slot_name will be 50.
1264 : *
1265 : * The returned slot name is stored in the supplied buffer (syncslotname) with
1266 : * the given size.
1267 : *
1268 : * Note: We don't use the subscription slot name as part of tablesync slot name
1269 : * because we are responsible for cleaning up these slots and it could become
1270 : * impossible to recalculate what name to cleanup if the subscription slot name
1271 : * had changed.
1272 : */
1273 : void
1274 724 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1275 : char *syncslotname, Size szslot)
1276 : {
1277 724 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1278 : relid, GetSystemIdentifier());
1279 724 : }
1280 :
1281 : /*
1282 : * Start syncing the table in the sync worker.
1283 : *
1284 : * If nothing needs to be done to sync the table, we exit the worker without
1285 : * any further action.
1286 : *
1287 : * The returned slot name is palloc'ed in current memory context.
1288 : */
1289 : static char *
1290 370 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1291 : {
1292 : char *slotname;
1293 : char *err;
1294 : char relstate;
1295 : XLogRecPtr relstate_lsn;
1296 : Relation rel;
1297 : AclResult aclresult;
1298 : WalRcvExecResult *res;
1299 : char originname[NAMEDATALEN];
1300 : RepOriginId originid;
1301 : UserContext ucxt;
1302 : bool must_use_password;
1303 : bool run_as_owner;
1304 :
1305 : /* Check the state of the table synchronization. */
1306 370 : StartTransactionCommand();
1307 370 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1308 370 : MyLogicalRepWorker->relid,
1309 : &relstate_lsn);
1310 370 : CommitTransactionCommand();
1311 :
1312 : /* Is the use of a password mandatory? */
1313 732 : must_use_password = MySubscription->passwordrequired &&
1314 362 : !MySubscription->ownersuperuser;
1315 :
1316 370 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1317 370 : MyLogicalRepWorker->relstate = relstate;
1318 370 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1319 370 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1320 :
1321 : /*
1322 : * If synchronization is already done or no longer necessary, exit now
1323 : * that we've updated shared memory state.
1324 : */
1325 370 : switch (relstate)
1326 : {
1327 0 : case SUBREL_STATE_SYNCDONE:
1328 : case SUBREL_STATE_READY:
1329 : case SUBREL_STATE_UNKNOWN:
1330 0 : finish_sync_worker(); /* doesn't return */
1331 : }
1332 :
1333 : /* Calculate the name of the tablesync slot. */
1334 370 : slotname = (char *) palloc(NAMEDATALEN);
1335 370 : ReplicationSlotNameForTablesync(MySubscription->oid,
1336 370 : MyLogicalRepWorker->relid,
1337 : slotname,
1338 : NAMEDATALEN);
1339 :
1340 : /*
1341 : * Here we use the slot name instead of the subscription name as the
1342 : * application_name, so that it is different from the leader apply worker,
1343 : * so that synchronous replication can distinguish them.
1344 : */
1345 368 : LogRepWorkerWalRcvConn =
1346 370 : walrcv_connect(MySubscription->conninfo, true, true,
1347 : must_use_password,
1348 : slotname, &err);
1349 368 : if (LogRepWorkerWalRcvConn == NULL)
1350 0 : ereport(ERROR,
1351 : (errcode(ERRCODE_CONNECTION_FAILURE),
1352 : errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1353 : MySubscription->name, err)));
1354 :
1355 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1356 : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1357 : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1358 :
1359 : /* Assign the origin tracking record name. */
1360 368 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1361 368 : MyLogicalRepWorker->relid,
1362 : originname,
1363 : sizeof(originname));
1364 :
1365 368 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1366 : {
1367 : /*
1368 : * We have previously errored out before finishing the copy so the
1369 : * replication slot might exist. We want to remove the slot if it
1370 : * already exists and proceed.
1371 : *
1372 : * XXX We could also instead try to drop the slot, last time we failed
1373 : * but for that, we might need to clean up the copy state as it might
1374 : * be in the middle of fetching the rows. Also, if there is a network
1375 : * breakdown then it wouldn't have succeeded so trying it next time
1376 : * seems like a better bet.
1377 : */
1378 14 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1379 : }
1380 354 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1381 : {
1382 : /*
1383 : * The COPY phase was previously done, but tablesync then crashed
1384 : * before it was able to finish normally.
1385 : */
1386 0 : StartTransactionCommand();
1387 :
1388 : /*
1389 : * The origin tracking name must already exist. It was created first
1390 : * time this tablesync was launched.
1391 : */
1392 0 : originid = replorigin_by_name(originname, false);
1393 0 : replorigin_session_setup(originid, 0);
1394 0 : replorigin_session_origin = originid;
1395 0 : *origin_startpos = replorigin_session_get_progress(false);
1396 :
1397 0 : CommitTransactionCommand();
1398 :
1399 0 : goto copy_table_done;
1400 : }
1401 :
1402 368 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1403 368 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1404 368 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1405 368 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1406 :
1407 : /* Update the state and make it visible to others. */
1408 368 : StartTransactionCommand();
1409 368 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1410 368 : MyLogicalRepWorker->relid,
1411 368 : MyLogicalRepWorker->relstate,
1412 368 : MyLogicalRepWorker->relstate_lsn);
1413 368 : CommitTransactionCommand();
1414 368 : pgstat_report_stat(true);
1415 :
1416 368 : StartTransactionCommand();
1417 :
1418 : /*
1419 : * Use a standard write lock here. It might be better to disallow access
1420 : * to the table while it's being synchronized. But we don't want to block
1421 : * the main apply process from working and it has to open the relation in
1422 : * RowExclusiveLock when remapping remote relation id to local one.
1423 : */
1424 368 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1425 :
1426 : /*
1427 : * Start a transaction in the remote node in REPEATABLE READ mode. This
1428 : * ensures that both the replication slot we create (see below) and the
1429 : * COPY are consistent with each other.
1430 : */
1431 368 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1432 : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1433 : 0, NULL);
1434 368 : if (res->status != WALRCV_OK_COMMAND)
1435 0 : ereport(ERROR,
1436 : (errcode(ERRCODE_CONNECTION_FAILURE),
1437 : errmsg("table copy could not start transaction on publisher: %s",
1438 : res->err)));
1439 368 : walrcv_clear_result(res);
1440 :
1441 : /*
1442 : * Create a new permanent logical decoding slot. This slot will be used
1443 : * for the catchup phase after COPY is done, so tell it to use the
1444 : * snapshot to make the final data consistent.
1445 : */
1446 368 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1447 : slotname, false /* permanent */ , false /* two_phase */ ,
1448 : MySubscription->failover,
1449 : CRS_USE_SNAPSHOT, origin_startpos);
1450 :
1451 : /*
1452 : * Setup replication origin tracking. The purpose of doing this before the
1453 : * copy is to avoid doing the copy again due to any error in setting up
1454 : * origin tracking.
1455 : */
1456 368 : originid = replorigin_by_name(originname, true);
1457 368 : if (!OidIsValid(originid))
1458 : {
1459 : /*
1460 : * Origin tracking does not exist, so create it now.
1461 : *
1462 : * Then advance to the LSN got from walrcv_create_slot. This is WAL
1463 : * logged for the purpose of recovery. Locks are to prevent the
1464 : * replication origin from vanishing while advancing.
1465 : */
1466 368 : originid = replorigin_create(originname);
1467 :
1468 368 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1469 368 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1470 : true /* go backward */ , true /* WAL log */ );
1471 368 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1472 :
1473 368 : replorigin_session_setup(originid, 0);
1474 368 : replorigin_session_origin = originid;
1475 : }
1476 : else
1477 : {
1478 0 : ereport(ERROR,
1479 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1480 : errmsg("replication origin \"%s\" already exists",
1481 : originname)));
1482 : }
1483 :
1484 : /*
1485 : * Make sure that the copy command runs as the table owner, unless the
1486 : * user has opted out of that behaviour.
1487 : */
1488 368 : run_as_owner = MySubscription->runasowner;
1489 368 : if (!run_as_owner)
1490 366 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1491 :
1492 : /*
1493 : * Check that our table sync worker has permission to insert into the
1494 : * target table.
1495 : */
1496 368 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1497 : ACL_INSERT);
1498 368 : if (aclresult != ACLCHECK_OK)
1499 0 : aclcheck_error(aclresult,
1500 0 : get_relkind_objtype(rel->rd_rel->relkind),
1501 0 : RelationGetRelationName(rel));
1502 :
1503 : /*
1504 : * COPY FROM does not honor RLS policies. That is not a problem for
1505 : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1506 : * who has it implicitly), but other roles should not be able to
1507 : * circumvent RLS. Disallow logical replication into RLS enabled
1508 : * relations for such roles.
1509 : */
1510 368 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
1511 0 : ereport(ERROR,
1512 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1513 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1514 : GetUserNameFromId(GetUserId(), true),
1515 : RelationGetRelationName(rel))));
1516 :
1517 : /* Now do the initial data copy */
1518 368 : PushActiveSnapshot(GetTransactionSnapshot());
1519 368 : copy_table(rel);
1520 348 : PopActiveSnapshot();
1521 :
1522 348 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1523 348 : if (res->status != WALRCV_OK_COMMAND)
1524 0 : ereport(ERROR,
1525 : (errcode(ERRCODE_CONNECTION_FAILURE),
1526 : errmsg("table copy could not finish transaction on publisher: %s",
1527 : res->err)));
1528 348 : walrcv_clear_result(res);
1529 :
1530 348 : if (!run_as_owner)
1531 346 : RestoreUserContext(&ucxt);
1532 :
1533 348 : table_close(rel, NoLock);
1534 :
1535 : /* Make the copy visible. */
1536 348 : CommandCounterIncrement();
1537 :
1538 : /*
1539 : * Update the persisted state to indicate the COPY phase is done; make it
1540 : * visible to others.
1541 : */
1542 348 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1543 348 : MyLogicalRepWorker->relid,
1544 : SUBREL_STATE_FINISHEDCOPY,
1545 348 : MyLogicalRepWorker->relstate_lsn);
1546 :
1547 348 : CommitTransactionCommand();
1548 :
1549 348 : copy_table_done:
1550 :
1551 348 : elog(DEBUG1,
1552 : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1553 : originname, LSN_FORMAT_ARGS(*origin_startpos));
1554 :
1555 : /*
1556 : * We are done with the initial data synchronization, update the state.
1557 : */
1558 348 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1559 348 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1560 348 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1561 348 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1562 :
1563 : /*
1564 : * Finally, wait until the leader apply worker tells us to catch up and
1565 : * then return to let LogicalRepApplyLoop do it.
1566 : */
1567 348 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1568 348 : return slotname;
1569 : }
1570 :
1571 : /*
1572 : * Common code to fetch the up-to-date sync state info into the static lists.
1573 : *
1574 : * Returns true if subscription has 1 or more tables, else false.
1575 : *
1576 : * Note: If this function started the transaction (indicated by the parameter)
1577 : * then it is the caller's responsibility to commit it.
1578 : */
1579 : static bool
1580 7608 : FetchTableStates(bool *started_tx)
1581 : {
1582 : static bool has_subrels = false;
1583 :
1584 7608 : *started_tx = false;
1585 :
1586 7608 : if (table_states_validity != SYNC_TABLE_STATE_VALID)
1587 : {
1588 : MemoryContext oldctx;
1589 : List *rstates;
1590 : ListCell *lc;
1591 : SubscriptionRelState *rstate;
1592 :
1593 1594 : table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
1594 :
1595 : /* Clean the old lists. */
1596 1594 : list_free_deep(table_states_not_ready);
1597 1594 : table_states_not_ready = NIL;
1598 :
1599 1594 : if (!IsTransactionState())
1600 : {
1601 1558 : StartTransactionCommand();
1602 1558 : *started_tx = true;
1603 : }
1604 :
1605 : /* Fetch all non-ready tables. */
1606 1594 : rstates = GetSubscriptionRelations(MySubscription->oid, true);
1607 :
1608 : /* Allocate the tracking info in a permanent memory context. */
1609 1594 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1610 4118 : foreach(lc, rstates)
1611 : {
1612 2524 : rstate = palloc(sizeof(SubscriptionRelState));
1613 2524 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1614 2524 : table_states_not_ready = lappend(table_states_not_ready, rstate);
1615 : }
1616 1594 : MemoryContextSwitchTo(oldctx);
1617 :
1618 : /*
1619 : * Does the subscription have tables?
1620 : *
1621 : * If there were not-READY relations found then we know it does. But
1622 : * if table_states_not_ready was empty we still need to check again to
1623 : * see if there are 0 tables.
1624 : */
1625 2012 : has_subrels = (table_states_not_ready != NIL) ||
1626 418 : HasSubscriptionRelations(MySubscription->oid);
1627 :
1628 : /*
1629 : * If the subscription relation cache has been invalidated since we
1630 : * entered this routine, we still use and return the relations we just
1631 : * finished constructing, to avoid infinite loops, but we leave the
1632 : * table states marked as stale so that we'll rebuild it again on next
1633 : * access. Otherwise, we mark the table states as valid.
1634 : */
1635 1594 : if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
1636 1592 : table_states_validity = SYNC_TABLE_STATE_VALID;
1637 : }
1638 :
1639 7608 : return has_subrels;
1640 : }
1641 :
1642 : /*
1643 : * Execute the initial sync with error handling. Disable the subscription,
1644 : * if it's required.
1645 : *
1646 : * Allocate the slot name in long-lived context on return. Note that we don't
1647 : * handle FATAL errors which are probably because of system resource error and
1648 : * are not repeatable.
1649 : */
1650 : static void
1651 370 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1652 : {
1653 370 : char *sync_slotname = NULL;
1654 :
1655 : Assert(am_tablesync_worker());
1656 :
1657 370 : PG_TRY();
1658 : {
1659 : /* Call initial sync. */
1660 370 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1661 : }
1662 18 : PG_CATCH();
1663 : {
1664 18 : if (MySubscription->disableonerr)
1665 2 : DisableSubscriptionAndExit();
1666 : else
1667 : {
1668 : /*
1669 : * Report the worker failed during table synchronization. Abort
1670 : * the current transaction so that the stats message is sent in an
1671 : * idle state.
1672 : */
1673 16 : AbortOutOfAnyTransaction();
1674 16 : pgstat_report_subscription_error(MySubscription->oid, false);
1675 :
1676 16 : PG_RE_THROW();
1677 : }
1678 : }
1679 348 : PG_END_TRY();
1680 :
1681 : /* allocate slot name in long-lived context */
1682 348 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1683 348 : pfree(sync_slotname);
1684 348 : }
1685 :
1686 : /*
1687 : * Runs the tablesync worker.
1688 : *
1689 : * It starts syncing tables. After a successful sync, sets streaming options
1690 : * and starts streaming to catchup with apply worker.
1691 : */
1692 : static void
1693 370 : run_tablesync_worker()
1694 : {
1695 : char originname[NAMEDATALEN];
1696 370 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1697 370 : char *slotname = NULL;
1698 : WalRcvStreamOptions options;
1699 :
1700 370 : start_table_sync(&origin_startpos, &slotname);
1701 :
1702 348 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1703 348 : MyLogicalRepWorker->relid,
1704 : originname,
1705 : sizeof(originname));
1706 :
1707 348 : set_apply_error_context_origin(originname);
1708 :
1709 348 : set_stream_options(&options, slotname, &origin_startpos);
1710 :
1711 348 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1712 :
1713 : /* Apply the changes till we catchup with the apply worker. */
1714 348 : start_apply(origin_startpos);
1715 0 : }
1716 :
1717 : /* Logical Replication Tablesync worker entry point */
1718 : void
1719 370 : TablesyncWorkerMain(Datum main_arg)
1720 : {
1721 370 : int worker_slot = DatumGetInt32(main_arg);
1722 :
1723 370 : SetupApplyOrSyncWorker(worker_slot);
1724 :
1725 370 : run_tablesync_worker();
1726 :
1727 0 : finish_sync_worker();
1728 : }
1729 :
1730 : /*
1731 : * If the subscription has no tables then return false.
1732 : *
1733 : * Otherwise, are all tablesyncs READY?
1734 : *
1735 : * Note: This function is not suitable to be called from outside of apply or
1736 : * tablesync workers because MySubscription needs to be already initialized.
1737 : */
1738 : bool
1739 138 : AllTablesyncsReady(void)
1740 : {
1741 138 : bool started_tx = false;
1742 138 : bool has_subrels = false;
1743 :
1744 : /* We need up-to-date sync state info for subscription tables here. */
1745 138 : has_subrels = FetchTableStates(&started_tx);
1746 :
1747 138 : if (started_tx)
1748 : {
1749 26 : CommitTransactionCommand();
1750 26 : pgstat_report_stat(true);
1751 : }
1752 :
1753 : /*
1754 : * Return false when there are no tables in subscription or not all tables
1755 : * are in ready state; true otherwise.
1756 : */
1757 138 : return has_subrels && (table_states_not_ready == NIL);
1758 : }
1759 :
1760 : /*
1761 : * Update the two_phase state of the specified subscription in pg_subscription.
1762 : */
1763 : void
1764 14 : UpdateTwoPhaseState(Oid suboid, char new_state)
1765 : {
1766 : Relation rel;
1767 : HeapTuple tup;
1768 : bool nulls[Natts_pg_subscription];
1769 : bool replaces[Natts_pg_subscription];
1770 : Datum values[Natts_pg_subscription];
1771 :
1772 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1773 : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1774 : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1775 :
1776 14 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1777 14 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1778 14 : if (!HeapTupleIsValid(tup))
1779 0 : elog(ERROR,
1780 : "cache lookup failed for subscription oid %u",
1781 : suboid);
1782 :
1783 : /* Form a new tuple. */
1784 14 : memset(values, 0, sizeof(values));
1785 14 : memset(nulls, false, sizeof(nulls));
1786 14 : memset(replaces, false, sizeof(replaces));
1787 :
1788 : /* And update/set two_phase state */
1789 14 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1790 14 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1791 :
1792 14 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1793 : values, nulls, replaces);
1794 14 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1795 :
1796 14 : heap_freetuple(tup);
1797 14 : table_close(rel, RowExclusiveLock);
1798 14 : }
|