Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * tablesync.c
3 : * PostgreSQL logical replication: initial table data synchronization
4 : *
5 : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/tablesync.c
9 : *
10 : * NOTES
11 : * This file contains code for initial table data synchronization for
12 : * logical replication.
13 : *
14 : * The initial data synchronization is done separately for each table,
15 : * in a separate apply worker that only fetches the initial snapshot data
16 : * from the publisher and then synchronizes the position in the stream with
17 : * the leader apply worker.
18 : *
19 : * There are several reasons for doing the synchronization this way:
20 : * - It allows us to parallelize the initial data synchronization
21 : * which lowers the time needed for it to happen.
22 : * - The initial synchronization does not have to hold the xid and LSN
23 : * for the time it takes to copy data of all tables, causing less
24 : * bloat and lower disk consumption compared to doing the
25 : * synchronization in a single process for the whole database.
26 : * - It allows us to synchronize any tables added after the initial
27 : * synchronization has finished.
28 : *
29 : * The stream position synchronization works in multiple steps:
30 : * - Apply worker requests a tablesync worker to start, setting the new
31 : * table state to INIT.
32 : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : * copying.
34 : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : * worker specific) state to indicate when the copy phase has completed, so
36 : * if the worker crashes with this (non-memory) state then the copy will not
37 : * be re-attempted.
38 : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : * until either the table state is set to SYNCDONE or the sync worker
42 : * exits.
43 : * - After the sync worker has seen the state change to CATCHUP, it will
44 : * read the stream and apply changes (acting like an apply worker) until
45 : * it catches up to the specified stream position. Then it sets the
46 : * state to SYNCDONE. There might be zero changes applied between
47 : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : * apply worker.
49 : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : * the table until it reaches the SYNCDONE stream position, at which
51 : * point it sets state to READY and stops tracking. Again, there might
52 : * be zero changes in between.
53 : *
54 : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : *
57 : * The catalog pg_subscription_rel is used to keep information about
58 : * subscribed tables and their state. The catalog holds all states
59 : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : *
61 : * Example flows look like this:
62 : * - Apply is in front:
63 : * sync:8
64 : * -> set in catalog FINISHEDCOPY
65 : * -> set in memory SYNCWAIT
66 : * apply:10
67 : * -> set in memory CATCHUP
68 : * -> enter wait-loop
69 : * sync:10
70 : * -> set in catalog SYNCDONE
71 : * -> exit
72 : * apply:10
73 : * -> exit wait-loop
74 : * -> continue rep
75 : * apply:11
76 : * -> set in catalog READY
77 : *
78 : * - Sync is in front:
79 : * sync:10
80 : * -> set in catalog FINISHEDCOPY
81 : * -> set in memory SYNCWAIT
82 : * apply:8
83 : * -> set in memory CATCHUP
84 : * -> continue per-table filtering
85 : * sync:10
86 : * -> set in catalog SYNCDONE
87 : * -> exit
88 : * apply:10
89 : * -> set in catalog READY
90 : * -> stop per-table filtering
91 : * -> continue rep
92 : *-------------------------------------------------------------------------
93 : */
94 :
95 : #include "postgres.h"
96 :
97 : #include "access/table.h"
98 : #include "access/xact.h"
99 : #include "catalog/indexing.h"
100 : #include "catalog/pg_subscription_rel.h"
101 : #include "catalog/pg_type.h"
102 : #include "commands/copy.h"
103 : #include "miscadmin.h"
104 : #include "nodes/makefuncs.h"
105 : #include "parser/parse_relation.h"
106 : #include "pgstat.h"
107 : #include "replication/logicallauncher.h"
108 : #include "replication/logicalrelation.h"
109 : #include "replication/logicalworker.h"
110 : #include "replication/origin.h"
111 : #include "replication/slot.h"
112 : #include "replication/walreceiver.h"
113 : #include "replication/worker_internal.h"
114 : #include "storage/ipc.h"
115 : #include "storage/lmgr.h"
116 : #include "utils/acl.h"
117 : #include "utils/array.h"
118 : #include "utils/builtins.h"
119 : #include "utils/lsyscache.h"
120 : #include "utils/memutils.h"
121 : #include "utils/rls.h"
122 : #include "utils/snapmgr.h"
123 : #include "utils/syscache.h"
124 : #include "utils/usercontext.h"
125 :
126 : typedef enum
127 : {
128 : SYNC_TABLE_STATE_NEEDS_REBUILD,
129 : SYNC_TABLE_STATE_REBUILD_STARTED,
130 : SYNC_TABLE_STATE_VALID,
131 : } SyncingTablesState;
132 :
133 : static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
134 : static List *table_states_not_ready = NIL;
135 : static bool FetchTableStates(bool *started_tx);
136 :
137 : static StringInfo copybuf = NULL;
138 :
139 : /*
140 : * Exit routine for synchronization worker.
141 : */
142 : static void
143 : pg_attribute_noreturn()
144 320 : finish_sync_worker(void)
145 : {
146 : /*
147 : * Commit any outstanding transaction. This is the usual case, unless
148 : * there was nothing to do for the table.
149 : */
150 320 : if (IsTransactionState())
151 : {
152 320 : CommitTransactionCommand();
153 320 : pgstat_report_stat(true);
154 : }
155 :
156 : /* And flush all writes. */
157 320 : XLogFlush(GetXLogWriteRecPtr());
158 :
159 320 : StartTransactionCommand();
160 320 : ereport(LOG,
161 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
162 : MySubscription->name,
163 : get_rel_name(MyLogicalRepWorker->relid))));
164 320 : CommitTransactionCommand();
165 :
166 : /* Find the leader apply worker and signal it. */
167 320 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
168 :
169 : /* Stop gracefully */
170 320 : proc_exit(0);
171 : }
172 :
173 : /*
174 : * Wait until the relation sync state is set in the catalog to the expected
175 : * one; return true when it happens.
176 : *
177 : * Returns false if the table sync worker or the table itself have
178 : * disappeared, or the table state has been reset.
179 : *
180 : * Currently, this is used in the apply worker when transitioning from
181 : * CATCHUP state to SYNCDONE.
182 : */
183 : static bool
184 632 : wait_for_relation_state_change(Oid relid, char expected_state)
185 : {
186 : char state;
187 :
188 : for (;;)
189 316 : {
190 : LogicalRepWorker *worker;
191 : XLogRecPtr statelsn;
192 :
193 632 : CHECK_FOR_INTERRUPTS();
194 :
195 632 : InvalidateCatalogSnapshot();
196 632 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
197 : relid, &statelsn);
198 :
199 632 : if (state == SUBREL_STATE_UNKNOWN)
200 0 : break;
201 :
202 632 : if (state == expected_state)
203 316 : return true;
204 :
205 : /* Check if the sync worker is still running and bail if not. */
206 316 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
207 316 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
208 : false);
209 316 : LWLockRelease(LogicalRepWorkerLock);
210 316 : if (!worker)
211 0 : break;
212 :
213 316 : (void) WaitLatch(MyLatch,
214 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
215 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
216 :
217 316 : ResetLatch(MyLatch);
218 : }
219 :
220 0 : return false;
221 : }
222 :
223 : /*
224 : * Wait until the apply worker changes the state of our synchronization
225 : * worker to the expected one.
226 : *
227 : * Used when transitioning from SYNCWAIT state to CATCHUP.
228 : *
229 : * Returns false if the apply worker has disappeared.
230 : */
231 : static bool
232 642 : wait_for_worker_state_change(char expected_state)
233 : {
234 : int rc;
235 :
236 : for (;;)
237 322 : {
238 : LogicalRepWorker *worker;
239 :
240 642 : CHECK_FOR_INTERRUPTS();
241 :
242 : /*
243 : * Done if already in correct state. (We assume this fetch is atomic
244 : * enough to not give a misleading answer if we do it with no lock.)
245 : */
246 642 : if (MyLogicalRepWorker->relstate == expected_state)
247 320 : return true;
248 :
249 : /*
250 : * Bail out if the apply worker has died, else signal it we're
251 : * waiting.
252 : */
253 322 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
254 322 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
255 : InvalidOid, false);
256 322 : if (worker && worker->proc)
257 322 : logicalrep_worker_wakeup_ptr(worker);
258 322 : LWLockRelease(LogicalRepWorkerLock);
259 322 : if (!worker)
260 0 : break;
261 :
262 : /*
263 : * Wait. We expect to get a latch signal back from the apply worker,
264 : * but use a timeout in case it dies without sending one.
265 : */
266 322 : rc = WaitLatch(MyLatch,
267 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
268 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
269 :
270 322 : if (rc & WL_LATCH_SET)
271 322 : ResetLatch(MyLatch);
272 : }
273 :
274 0 : return false;
275 : }
276 :
277 : /*
278 : * Callback from syscache invalidation.
279 : */
280 : void
281 2924 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
282 : {
283 2924 : table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
284 2924 : }
285 :
286 : /*
287 : * Handle table synchronization cooperation from the synchronization
288 : * worker.
289 : *
290 : * If the sync worker is in CATCHUP state and reached (or passed) the
291 : * predetermined synchronization point in the WAL stream, mark the table as
292 : * SYNCDONE and finish.
293 : */
294 : static void
295 326 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
296 : {
297 326 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
298 :
299 326 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
300 326 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
301 : {
302 : TimeLineID tli;
303 320 : char syncslotname[NAMEDATALEN] = {0};
304 320 : char originname[NAMEDATALEN] = {0};
305 :
306 320 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
307 320 : MyLogicalRepWorker->relstate_lsn = current_lsn;
308 :
309 320 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
310 :
311 : /*
312 : * UpdateSubscriptionRelState must be called within a transaction.
313 : */
314 320 : if (!IsTransactionState())
315 320 : StartTransactionCommand();
316 :
317 320 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
318 320 : MyLogicalRepWorker->relid,
319 320 : MyLogicalRepWorker->relstate,
320 320 : MyLogicalRepWorker->relstate_lsn);
321 :
322 : /*
323 : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
324 : * the slot.
325 : */
326 320 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
327 :
328 : /*
329 : * Cleanup the tablesync slot.
330 : *
331 : * This has to be done after updating the state because otherwise if
332 : * there is an error while doing the database operations we won't be
333 : * able to rollback dropped slot.
334 : */
335 320 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
336 320 : MyLogicalRepWorker->relid,
337 : syncslotname,
338 : sizeof(syncslotname));
339 :
340 : /*
341 : * It is important to give an error if we are unable to drop the slot,
342 : * otherwise, it won't be dropped till the corresponding subscription
343 : * is dropped. So passing missing_ok = false.
344 : */
345 320 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
346 :
347 320 : CommitTransactionCommand();
348 320 : pgstat_report_stat(false);
349 :
350 : /*
351 : * Start a new transaction to clean up the tablesync origin tracking.
352 : * This transaction will be ended within the finish_sync_worker().
353 : * Now, even, if we fail to remove this here, the apply worker will
354 : * ensure to clean it up afterward.
355 : *
356 : * We need to do this after the table state is set to SYNCDONE.
357 : * Otherwise, if an error occurs while performing the database
358 : * operation, the worker will be restarted and the in-memory state of
359 : * replication progress (remote_lsn) won't be rolled-back which would
360 : * have been cleared before restart. So, the restarted worker will use
361 : * invalid replication progress state resulting in replay of
362 : * transactions that have already been applied.
363 : */
364 320 : StartTransactionCommand();
365 :
366 320 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
367 320 : MyLogicalRepWorker->relid,
368 : originname,
369 : sizeof(originname));
370 :
371 : /*
372 : * Resetting the origin session removes the ownership of the slot.
373 : * This is needed to allow the origin to be dropped.
374 : */
375 320 : replorigin_session_reset();
376 320 : replorigin_session_origin = InvalidRepOriginId;
377 320 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
378 320 : replorigin_session_origin_timestamp = 0;
379 :
380 : /*
381 : * Drop the tablesync's origin tracking if exists.
382 : *
383 : * There is a chance that the user is concurrently performing refresh
384 : * for the subscription where we remove the table state and its origin
385 : * or the apply worker would have removed this origin. So passing
386 : * missing_ok = true.
387 : */
388 320 : replorigin_drop_by_name(originname, true, false);
389 :
390 320 : finish_sync_worker();
391 : }
392 : else
393 6 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
394 6 : }
395 :
396 : /*
397 : * Handle table synchronization cooperation from the apply worker.
398 : *
399 : * Walk over all subscription tables that are individually tracked by the
400 : * apply process (currently, all that have state other than
401 : * SUBREL_STATE_READY) and manage synchronization for them.
402 : *
403 : * If there are tables that need synchronizing and are not being synchronized
404 : * yet, start sync workers for them (if there are free slots for sync
405 : * workers). To prevent starting the sync worker for the same relation at a
406 : * high frequency after a failure, we store its last start time with each sync
407 : * state info. We start the sync worker for the same relation after waiting
408 : * at least wal_retrieve_retry_interval.
409 : *
410 : * For tables that are being synchronized already, check if sync workers
411 : * either need action from the apply worker or have finished. This is the
412 : * SYNCWAIT to CATCHUP transition.
413 : *
414 : * If the synchronization position is reached (SYNCDONE), then the table can
415 : * be marked as READY and is no longer tracked.
416 : */
417 : static void
418 8770 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
419 : {
420 : struct tablesync_start_time_mapping
421 : {
422 : Oid relid;
423 : TimestampTz last_start_time;
424 : };
425 : static HTAB *last_start_times = NULL;
426 : ListCell *lc;
427 8770 : bool started_tx = false;
428 8770 : bool should_exit = false;
429 :
430 : Assert(!IsTransactionState());
431 :
432 : /* We need up-to-date sync state info for subscription tables here. */
433 8770 : FetchTableStates(&started_tx);
434 :
435 : /*
436 : * Prepare a hash table for tracking last start times of workers, to avoid
437 : * immediate restarts. We don't need it if there are no tables that need
438 : * syncing.
439 : */
440 8770 : if (table_states_not_ready != NIL && !last_start_times)
441 202 : {
442 : HASHCTL ctl;
443 :
444 202 : ctl.keysize = sizeof(Oid);
445 202 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
446 202 : last_start_times = hash_create("Logical replication table sync worker start times",
447 : 256, &ctl, HASH_ELEM | HASH_BLOBS);
448 : }
449 :
450 : /*
451 : * Clean up the hash table when we're done with all tables (just to
452 : * release the bit of memory).
453 : */
454 8568 : else if (table_states_not_ready == NIL && last_start_times)
455 : {
456 150 : hash_destroy(last_start_times);
457 150 : last_start_times = NULL;
458 : }
459 :
460 : /*
461 : * Process all tables that are being synchronized.
462 : */
463 11188 : foreach(lc, table_states_not_ready)
464 : {
465 2418 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
466 :
467 2418 : if (rstate->state == SUBREL_STATE_SYNCDONE)
468 : {
469 : /*
470 : * Apply has caught up to the position where the table sync has
471 : * finished. Mark the table as ready so that the apply will just
472 : * continue to replicate it normally.
473 : */
474 318 : if (current_lsn >= rstate->lsn)
475 : {
476 : char originname[NAMEDATALEN];
477 :
478 316 : rstate->state = SUBREL_STATE_READY;
479 316 : rstate->lsn = current_lsn;
480 316 : if (!started_tx)
481 : {
482 16 : StartTransactionCommand();
483 16 : started_tx = true;
484 : }
485 :
486 : /*
487 : * Remove the tablesync origin tracking if exists.
488 : *
489 : * There is a chance that the user is concurrently performing
490 : * refresh for the subscription where we remove the table
491 : * state and its origin or the tablesync worker would have
492 : * already removed this origin. We can't rely on tablesync
493 : * worker to remove the origin tracking as if there is any
494 : * error while dropping we won't restart it to drop the
495 : * origin. So passing missing_ok = true.
496 : */
497 316 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
498 : rstate->relid,
499 : originname,
500 : sizeof(originname));
501 316 : replorigin_drop_by_name(originname, true, false);
502 :
503 : /*
504 : * Update the state to READY only after the origin cleanup.
505 : */
506 316 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
507 316 : rstate->relid, rstate->state,
508 : rstate->lsn);
509 : }
510 : }
511 : else
512 : {
513 : LogicalRepWorker *syncworker;
514 :
515 : /*
516 : * Look for a sync worker for this relation.
517 : */
518 2100 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
519 :
520 2100 : syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
521 : rstate->relid, false);
522 :
523 2100 : if (syncworker)
524 : {
525 : /* Found one, update our copy of its state */
526 938 : SpinLockAcquire(&syncworker->relmutex);
527 938 : rstate->state = syncworker->relstate;
528 938 : rstate->lsn = syncworker->relstate_lsn;
529 938 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
530 : {
531 : /*
532 : * Sync worker is waiting for apply. Tell sync worker it
533 : * can catchup now.
534 : */
535 316 : syncworker->relstate = SUBREL_STATE_CATCHUP;
536 316 : syncworker->relstate_lsn =
537 316 : Max(syncworker->relstate_lsn, current_lsn);
538 : }
539 938 : SpinLockRelease(&syncworker->relmutex);
540 :
541 : /* If we told worker to catch up, wait for it. */
542 938 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
543 : {
544 : /* Signal the sync worker, as it may be waiting for us. */
545 316 : if (syncworker->proc)
546 316 : logicalrep_worker_wakeup_ptr(syncworker);
547 :
548 : /* Now safe to release the LWLock */
549 316 : LWLockRelease(LogicalRepWorkerLock);
550 :
551 316 : if (started_tx)
552 : {
553 : /*
554 : * We must commit the existing transaction to release
555 : * the existing locks before entering a busy loop.
556 : * This is required to avoid any undetected deadlocks
557 : * due to any existing lock as deadlock detector won't
558 : * be able to detect the waits on the latch.
559 : */
560 316 : CommitTransactionCommand();
561 316 : pgstat_report_stat(false);
562 : }
563 :
564 : /*
565 : * Enter busy loop and wait for synchronization worker to
566 : * reach expected state (or die trying).
567 : */
568 316 : StartTransactionCommand();
569 316 : started_tx = true;
570 :
571 316 : wait_for_relation_state_change(rstate->relid,
572 : SUBREL_STATE_SYNCDONE);
573 : }
574 : else
575 622 : LWLockRelease(LogicalRepWorkerLock);
576 : }
577 : else
578 : {
579 : /*
580 : * If there is no sync worker for this table yet, count
581 : * running sync workers for this subscription, while we have
582 : * the lock.
583 : */
584 : int nsyncworkers =
585 1162 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
586 :
587 : /* Now safe to release the LWLock */
588 1162 : LWLockRelease(LogicalRepWorkerLock);
589 :
590 : /*
591 : * If there are free sync worker slot(s), start a new sync
592 : * worker for the table.
593 : */
594 1162 : if (nsyncworkers < max_sync_workers_per_subscription)
595 : {
596 364 : TimestampTz now = GetCurrentTimestamp();
597 : struct tablesync_start_time_mapping *hentry;
598 : bool found;
599 :
600 364 : hentry = hash_search(last_start_times, &rstate->relid,
601 : HASH_ENTER, &found);
602 :
603 402 : if (!found ||
604 38 : TimestampDifferenceExceeds(hentry->last_start_time, now,
605 : wal_retrieve_retry_interval))
606 : {
607 338 : logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
608 338 : MyLogicalRepWorker->dbid,
609 338 : MySubscription->oid,
610 338 : MySubscription->name,
611 338 : MyLogicalRepWorker->userid,
612 : rstate->relid,
613 : DSM_HANDLE_INVALID);
614 338 : hentry->last_start_time = now;
615 : }
616 : }
617 : }
618 : }
619 : }
620 :
621 8770 : if (started_tx)
622 : {
623 : /*
624 : * Even when the two_phase mode is requested by the user, it remains
625 : * as 'pending' until all tablesyncs have reached READY state.
626 : *
627 : * When this happens, we restart the apply worker and (if the
628 : * conditions are still ok) then the two_phase tri-state will become
629 : * 'enabled' at that time.
630 : *
631 : * Note: If the subscription has no tables then leave the state as
632 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
633 : * work.
634 : */
635 1408 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
636 : {
637 50 : CommandCounterIncrement(); /* make updates visible */
638 50 : if (AllTablesyncsReady())
639 : {
640 12 : ereport(LOG,
641 : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
642 : MySubscription->name)));
643 12 : should_exit = true;
644 : }
645 : }
646 :
647 1408 : CommitTransactionCommand();
648 1408 : pgstat_report_stat(true);
649 : }
650 :
651 8770 : if (should_exit)
652 : {
653 : /*
654 : * Reset the last-start time for this worker so that the launcher will
655 : * restart it without waiting for wal_retrieve_retry_interval.
656 : */
657 12 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
658 :
659 12 : proc_exit(0);
660 : }
661 8758 : }
662 :
663 : /*
664 : * Process possible state change(s) of tables that are being synchronized.
665 : */
666 : void
667 9140 : process_syncing_tables(XLogRecPtr current_lsn)
668 : {
669 9140 : switch (MyLogicalRepWorker->type)
670 : {
671 44 : case WORKERTYPE_PARALLEL_APPLY:
672 :
673 : /*
674 : * Skip for parallel apply workers because they only operate on
675 : * tables that are in a READY state. See pa_can_start() and
676 : * should_apply_changes_for_rel().
677 : */
678 44 : break;
679 :
680 326 : case WORKERTYPE_TABLESYNC:
681 326 : process_syncing_tables_for_sync(current_lsn);
682 6 : break;
683 :
684 8770 : case WORKERTYPE_APPLY:
685 8770 : process_syncing_tables_for_apply(current_lsn);
686 8758 : break;
687 :
688 0 : case WORKERTYPE_UNKNOWN:
689 : /* Should never happen. */
690 0 : elog(ERROR, "Unknown worker type");
691 : }
692 8808 : }
693 :
694 : /*
695 : * Create list of columns for COPY based on logical relation mapping.
696 : */
697 : static List *
698 342 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
699 : {
700 342 : List *attnamelist = NIL;
701 : int i;
702 :
703 886 : for (i = 0; i < rel->remoterel.natts; i++)
704 : {
705 544 : attnamelist = lappend(attnamelist,
706 544 : makeString(rel->remoterel.attnames[i]));
707 : }
708 :
709 :
710 342 : return attnamelist;
711 : }
712 :
713 : /*
714 : * Data source callback for the COPY FROM, which reads from the remote
715 : * connection and passes the data back to our local COPY.
716 : */
717 : static int
718 27964 : copy_read_data(void *outbuf, int minread, int maxread)
719 : {
720 27964 : int bytesread = 0;
721 : int avail;
722 :
723 : /* If there are some leftover data from previous read, use it. */
724 27964 : avail = copybuf->len - copybuf->cursor;
725 27964 : if (avail)
726 : {
727 0 : if (avail > maxread)
728 0 : avail = maxread;
729 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
730 0 : copybuf->cursor += avail;
731 0 : maxread -= avail;
732 0 : bytesread += avail;
733 : }
734 :
735 27966 : while (maxread > 0 && bytesread < minread)
736 : {
737 27966 : pgsocket fd = PGINVALID_SOCKET;
738 : int len;
739 27966 : char *buf = NULL;
740 :
741 : for (;;)
742 : {
743 : /* Try read the data. */
744 27966 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
745 :
746 27966 : CHECK_FOR_INTERRUPTS();
747 :
748 27966 : if (len == 0)
749 2 : break;
750 27964 : else if (len < 0)
751 27964 : return bytesread;
752 : else
753 : {
754 : /* Process the data */
755 27626 : copybuf->data = buf;
756 27626 : copybuf->len = len;
757 27626 : copybuf->cursor = 0;
758 :
759 27626 : avail = copybuf->len - copybuf->cursor;
760 27626 : if (avail > maxread)
761 0 : avail = maxread;
762 27626 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
763 27626 : outbuf = (void *) ((char *) outbuf + avail);
764 27626 : copybuf->cursor += avail;
765 27626 : maxread -= avail;
766 27626 : bytesread += avail;
767 : }
768 :
769 27626 : if (maxread <= 0 || bytesread >= minread)
770 27626 : return bytesread;
771 : }
772 :
773 : /*
774 : * Wait for more data or latch.
775 : */
776 2 : (void) WaitLatchOrSocket(MyLatch,
777 : WL_SOCKET_READABLE | WL_LATCH_SET |
778 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
779 : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
780 :
781 2 : ResetLatch(MyLatch);
782 : }
783 :
784 0 : return bytesread;
785 : }
786 :
787 :
788 : /*
789 : * Get information about remote relation in similar fashion the RELATION
790 : * message provides during replication. This function also returns the relation
791 : * qualifications to be used in the COPY command.
792 : */
793 : static void
794 342 : fetch_remote_table_info(char *nspname, char *relname,
795 : LogicalRepRelation *lrel, List **qual)
796 : {
797 : WalRcvExecResult *res;
798 : StringInfoData cmd;
799 : TupleTableSlot *slot;
800 342 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
801 342 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
802 342 : Oid qualRow[] = {TEXTOID};
803 : bool isnull;
804 : int natt;
805 : ListCell *lc;
806 342 : Bitmapset *included_cols = NULL;
807 :
808 342 : lrel->nspname = nspname;
809 342 : lrel->relname = relname;
810 :
811 : /* First fetch Oid and replica identity. */
812 342 : initStringInfo(&cmd);
813 342 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
814 : " FROM pg_catalog.pg_class c"
815 : " INNER JOIN pg_catalog.pg_namespace n"
816 : " ON (c.relnamespace = n.oid)"
817 : " WHERE n.nspname = %s"
818 : " AND c.relname = %s",
819 : quote_literal_cstr(nspname),
820 : quote_literal_cstr(relname));
821 342 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
822 : lengthof(tableRow), tableRow);
823 :
824 342 : if (res->status != WALRCV_OK_TUPLES)
825 0 : ereport(ERROR,
826 : (errcode(ERRCODE_CONNECTION_FAILURE),
827 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
828 : nspname, relname, res->err)));
829 :
830 342 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
831 342 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
832 0 : ereport(ERROR,
833 : (errcode(ERRCODE_UNDEFINED_OBJECT),
834 : errmsg("table \"%s.%s\" not found on publisher",
835 : nspname, relname)));
836 :
837 342 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
838 : Assert(!isnull);
839 342 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
840 : Assert(!isnull);
841 342 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
842 : Assert(!isnull);
843 :
844 342 : ExecDropSingleTupleTableSlot(slot);
845 342 : walrcv_clear_result(res);
846 :
847 :
848 : /*
849 : * Get column lists for each relation.
850 : *
851 : * We need to do this before fetching info about column names and types,
852 : * so that we can skip columns that should not be replicated.
853 : */
854 342 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
855 : {
856 : WalRcvExecResult *pubres;
857 : TupleTableSlot *tslot;
858 342 : Oid attrsRow[] = {INT2VECTOROID};
859 : StringInfoData pub_names;
860 :
861 342 : initStringInfo(&pub_names);
862 1056 : foreach(lc, MySubscription->publications)
863 : {
864 714 : if (foreach_current_index(lc) > 0)
865 372 : appendStringInfoString(&pub_names, ", ");
866 714 : appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
867 : }
868 :
869 : /*
870 : * Fetch info about column lists for the relation (from all the
871 : * publications).
872 : */
873 342 : resetStringInfo(&cmd);
874 342 : appendStringInfo(&cmd,
875 : "SELECT DISTINCT"
876 : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
877 : " THEN NULL ELSE gpt.attrs END)"
878 : " FROM pg_publication p,"
879 : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
880 : " pg_class c"
881 : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
882 : " AND p.pubname IN ( %s )",
883 : lrel->remoteid,
884 : pub_names.data);
885 :
886 342 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
887 : lengthof(attrsRow), attrsRow);
888 :
889 342 : if (pubres->status != WALRCV_OK_TUPLES)
890 0 : ereport(ERROR,
891 : (errcode(ERRCODE_CONNECTION_FAILURE),
892 : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
893 : nspname, relname, pubres->err)));
894 :
895 : /*
896 : * We don't support the case where the column list is different for
897 : * the same table when combining publications. See comments atop
898 : * fetch_table_list. So there should be only one row returned.
899 : * Although we already checked this when creating the subscription, we
900 : * still need to check here in case the column list was changed after
901 : * creating the subscription and before the sync worker is started.
902 : */
903 342 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
904 0 : ereport(ERROR,
905 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
906 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
907 : nspname, relname));
908 :
909 : /*
910 : * Get the column list and build a single bitmap with the attnums.
911 : *
912 : * If we find a NULL value, it means all the columns should be
913 : * replicated.
914 : */
915 342 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
916 342 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
917 : {
918 342 : Datum cfval = slot_getattr(tslot, 1, &isnull);
919 :
920 342 : if (!isnull)
921 : {
922 : ArrayType *arr;
923 : int nelems;
924 : int16 *elems;
925 :
926 38 : arr = DatumGetArrayTypeP(cfval);
927 38 : nelems = ARR_DIMS(arr)[0];
928 38 : elems = (int16 *) ARR_DATA_PTR(arr);
929 :
930 106 : for (natt = 0; natt < nelems; natt++)
931 68 : included_cols = bms_add_member(included_cols, elems[natt]);
932 : }
933 :
934 342 : ExecClearTuple(tslot);
935 : }
936 342 : ExecDropSingleTupleTableSlot(tslot);
937 :
938 342 : walrcv_clear_result(pubres);
939 :
940 342 : pfree(pub_names.data);
941 : }
942 :
943 : /*
944 : * Now fetch column names and types.
945 : */
946 342 : resetStringInfo(&cmd);
947 342 : appendStringInfo(&cmd,
948 : "SELECT a.attnum,"
949 : " a.attname,"
950 : " a.atttypid,"
951 : " a.attnum = ANY(i.indkey)"
952 : " FROM pg_catalog.pg_attribute a"
953 : " LEFT JOIN pg_catalog.pg_index i"
954 : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
955 : " WHERE a.attnum > 0::pg_catalog.int2"
956 : " AND NOT a.attisdropped %s"
957 : " AND a.attrelid = %u"
958 : " ORDER BY a.attnum",
959 : lrel->remoteid,
960 342 : (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
961 : "AND a.attgenerated = ''" : ""),
962 : lrel->remoteid);
963 342 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
964 : lengthof(attrRow), attrRow);
965 :
966 342 : if (res->status != WALRCV_OK_TUPLES)
967 0 : ereport(ERROR,
968 : (errcode(ERRCODE_CONNECTION_FAILURE),
969 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
970 : nspname, relname, res->err)));
971 :
972 : /* We don't know the number of rows coming, so allocate enough space. */
973 342 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
974 342 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
975 342 : lrel->attkeys = NULL;
976 :
977 : /*
978 : * Store the columns as a list of names. Ignore those that are not
979 : * present in the column list, if there is one.
980 : */
981 342 : natt = 0;
982 342 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
983 930 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
984 : {
985 : char *rel_colname;
986 : AttrNumber attnum;
987 :
988 588 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
989 : Assert(!isnull);
990 :
991 : /* If the column is not in the column list, skip it. */
992 588 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
993 : {
994 44 : ExecClearTuple(slot);
995 44 : continue;
996 : }
997 :
998 544 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
999 : Assert(!isnull);
1000 :
1001 544 : lrel->attnames[natt] = rel_colname;
1002 544 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
1003 : Assert(!isnull);
1004 :
1005 544 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
1006 200 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1007 :
1008 : /* Should never happen. */
1009 544 : if (++natt >= MaxTupleAttributeNumber)
1010 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
1011 : nspname, relname);
1012 :
1013 544 : ExecClearTuple(slot);
1014 : }
1015 342 : ExecDropSingleTupleTableSlot(slot);
1016 :
1017 342 : lrel->natts = natt;
1018 :
1019 342 : walrcv_clear_result(res);
1020 :
1021 : /*
1022 : * Get relation's row filter expressions. DISTINCT avoids the same
1023 : * expression of a table in multiple publications from being included
1024 : * multiple times in the final expression.
1025 : *
1026 : * We need to copy the row even if it matches just one of the
1027 : * publications, so we later combine all the quals with OR.
1028 : *
1029 : * For initial synchronization, row filtering can be ignored in following
1030 : * cases:
1031 : *
1032 : * 1) one of the subscribed publications for the table hasn't specified
1033 : * any row filter
1034 : *
1035 : * 2) one of the subscribed publications has puballtables set to true
1036 : *
1037 : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1038 : * that includes this relation
1039 : */
1040 342 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
1041 : {
1042 : StringInfoData pub_names;
1043 :
1044 : /* Build the pubname list. */
1045 342 : initStringInfo(&pub_names);
1046 1398 : foreach_node(String, pubstr, MySubscription->publications)
1047 : {
1048 714 : char *pubname = strVal(pubstr);
1049 :
1050 714 : if (foreach_current_index(pubstr) > 0)
1051 372 : appendStringInfoString(&pub_names, ", ");
1052 :
1053 714 : appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
1054 : }
1055 :
1056 : /* Check for row filters. */
1057 342 : resetStringInfo(&cmd);
1058 342 : appendStringInfo(&cmd,
1059 : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1060 : " FROM pg_publication p,"
1061 : " LATERAL pg_get_publication_tables(p.pubname) gpt"
1062 : " WHERE gpt.relid = %u"
1063 : " AND p.pubname IN ( %s )",
1064 : lrel->remoteid,
1065 : pub_names.data);
1066 :
1067 342 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1068 :
1069 342 : if (res->status != WALRCV_OK_TUPLES)
1070 0 : ereport(ERROR,
1071 : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1072 : nspname, relname, res->err)));
1073 :
1074 : /*
1075 : * Multiple row filter expressions for the same table will be combined
1076 : * by COPY using OR. If any of the filter expressions for this table
1077 : * are null, it means the whole table will be copied. In this case it
1078 : * is not necessary to construct a unified row filter expression at
1079 : * all.
1080 : */
1081 342 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1082 370 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1083 : {
1084 350 : Datum rf = slot_getattr(slot, 1, &isnull);
1085 :
1086 350 : if (!isnull)
1087 28 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1088 : else
1089 : {
1090 : /* Ignore filters and cleanup as necessary. */
1091 322 : if (*qual)
1092 : {
1093 6 : list_free_deep(*qual);
1094 6 : *qual = NIL;
1095 : }
1096 322 : break;
1097 : }
1098 :
1099 28 : ExecClearTuple(slot);
1100 : }
1101 342 : ExecDropSingleTupleTableSlot(slot);
1102 :
1103 342 : walrcv_clear_result(res);
1104 : }
1105 :
1106 342 : pfree(cmd.data);
1107 342 : }
1108 :
1109 : /*
1110 : * Copy existing data of a table from publisher.
1111 : *
1112 : * Caller is responsible for locking the local relation.
1113 : */
1114 : static void
1115 342 : copy_table(Relation rel)
1116 : {
1117 : LogicalRepRelMapEntry *relmapentry;
1118 : LogicalRepRelation lrel;
1119 342 : List *qual = NIL;
1120 : WalRcvExecResult *res;
1121 : StringInfoData cmd;
1122 : CopyFromState cstate;
1123 : List *attnamelist;
1124 : ParseState *pstate;
1125 342 : List *options = NIL;
1126 :
1127 : /* Get the publisher relation info. */
1128 342 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
1129 342 : RelationGetRelationName(rel), &lrel, &qual);
1130 :
1131 : /* Put the relation into relmap. */
1132 342 : logicalrep_relmap_update(&lrel);
1133 :
1134 : /* Map the publisher relation to local one. */
1135 342 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1136 : Assert(rel == relmapentry->localrel);
1137 :
1138 : /* Start copy on the publisher. */
1139 342 : initStringInfo(&cmd);
1140 :
1141 : /* Regular table with no row filter */
1142 342 : if (lrel.relkind == RELKIND_RELATION && qual == NIL)
1143 : {
1144 294 : appendStringInfo(&cmd, "COPY %s",
1145 294 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1146 :
1147 : /* If the table has columns, then specify the columns */
1148 294 : if (lrel.natts)
1149 : {
1150 292 : appendStringInfoString(&cmd, " (");
1151 :
1152 : /*
1153 : * XXX Do we need to list the columns in all cases? Maybe we're
1154 : * replicating all columns?
1155 : */
1156 764 : for (int i = 0; i < lrel.natts; i++)
1157 : {
1158 472 : if (i > 0)
1159 180 : appendStringInfoString(&cmd, ", ");
1160 :
1161 472 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1162 : }
1163 :
1164 292 : appendStringInfoChar(&cmd, ')');
1165 : }
1166 :
1167 294 : appendStringInfoString(&cmd, " TO STDOUT");
1168 : }
1169 : else
1170 : {
1171 : /*
1172 : * For non-tables and tables with row filters, we need to do COPY
1173 : * (SELECT ...), but we can't just do SELECT * because we need to not
1174 : * copy generated columns. For tables with any row filters, build a
1175 : * SELECT query with OR'ed row filters for COPY.
1176 : */
1177 48 : appendStringInfoString(&cmd, "COPY (SELECT ");
1178 120 : for (int i = 0; i < lrel.natts; i++)
1179 : {
1180 72 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1181 72 : if (i < lrel.natts - 1)
1182 24 : appendStringInfoString(&cmd, ", ");
1183 : }
1184 :
1185 48 : appendStringInfoString(&cmd, " FROM ");
1186 :
1187 : /*
1188 : * For regular tables, make sure we don't copy data from a child that
1189 : * inherits the named table as those will be copied separately.
1190 : */
1191 48 : if (lrel.relkind == RELKIND_RELATION)
1192 14 : appendStringInfoString(&cmd, "ONLY ");
1193 :
1194 48 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1195 : /* list of OR'ed filters */
1196 48 : if (qual != NIL)
1197 : {
1198 : ListCell *lc;
1199 20 : char *q = strVal(linitial(qual));
1200 :
1201 20 : appendStringInfo(&cmd, " WHERE %s", q);
1202 22 : for_each_from(lc, qual, 1)
1203 : {
1204 2 : q = strVal(lfirst(lc));
1205 2 : appendStringInfo(&cmd, " OR %s", q);
1206 : }
1207 20 : list_free_deep(qual);
1208 : }
1209 :
1210 48 : appendStringInfoString(&cmd, ") TO STDOUT");
1211 : }
1212 :
1213 : /*
1214 : * Prior to v16, initial table synchronization will use text format even
1215 : * if the binary option is enabled for a subscription.
1216 : */
1217 342 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1218 342 : MySubscription->binary)
1219 : {
1220 10 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1221 10 : options = list_make1(makeDefElem("format",
1222 : (Node *) makeString("binary"), -1));
1223 : }
1224 :
1225 342 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1226 342 : pfree(cmd.data);
1227 342 : if (res->status != WALRCV_OK_COPY_OUT)
1228 0 : ereport(ERROR,
1229 : (errcode(ERRCODE_CONNECTION_FAILURE),
1230 : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1231 : lrel.nspname, lrel.relname, res->err)));
1232 342 : walrcv_clear_result(res);
1233 :
1234 342 : copybuf = makeStringInfo();
1235 :
1236 342 : pstate = make_parsestate(NULL);
1237 342 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1238 : NULL, false, false);
1239 :
1240 342 : attnamelist = make_copy_attnamelist(relmapentry);
1241 342 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1242 :
1243 : /* Do the copy */
1244 340 : (void) CopyFrom(cstate);
1245 :
1246 320 : logicalrep_rel_close(relmapentry, NoLock);
1247 320 : }
1248 :
1249 : /*
1250 : * Determine the tablesync slot name.
1251 : *
1252 : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1253 : * on slot name length. We append system_identifier to avoid slot_name
1254 : * collision with subscriptions in other clusters. With the current scheme
1255 : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1256 : * length of slot_name will be 50.
1257 : *
1258 : * The returned slot name is stored in the supplied buffer (syncslotname) with
1259 : * the given size.
1260 : *
1261 : * Note: We don't use the subscription slot name as part of tablesync slot name
1262 : * because we are responsible for cleaning up these slots and it could become
1263 : * impossible to recalculate what name to cleanup if the subscription slot name
1264 : * had changed.
1265 : */
1266 : void
1267 666 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1268 : char *syncslotname, Size szslot)
1269 : {
1270 666 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1271 : relid, GetSystemIdentifier());
1272 666 : }
1273 :
1274 : /*
1275 : * Start syncing the table in the sync worker.
1276 : *
1277 : * If nothing needs to be done to sync the table, we exit the worker without
1278 : * any further action.
1279 : *
1280 : * The returned slot name is palloc'ed in current memory context.
1281 : */
1282 : static char *
1283 342 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1284 : {
1285 : char *slotname;
1286 : char *err;
1287 : char relstate;
1288 : XLogRecPtr relstate_lsn;
1289 : Relation rel;
1290 : AclResult aclresult;
1291 : WalRcvExecResult *res;
1292 : char originname[NAMEDATALEN];
1293 : RepOriginId originid;
1294 : UserContext ucxt;
1295 : bool must_use_password;
1296 : bool run_as_owner;
1297 :
1298 : /* Check the state of the table synchronization. */
1299 342 : StartTransactionCommand();
1300 342 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1301 342 : MyLogicalRepWorker->relid,
1302 : &relstate_lsn);
1303 342 : CommitTransactionCommand();
1304 :
1305 : /* Is the use of a password mandatory? */
1306 676 : must_use_password = MySubscription->passwordrequired &&
1307 334 : !MySubscription->ownersuperuser;
1308 :
1309 342 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1310 342 : MyLogicalRepWorker->relstate = relstate;
1311 342 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1312 342 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1313 :
1314 : /*
1315 : * If synchronization is already done or no longer necessary, exit now
1316 : * that we've updated shared memory state.
1317 : */
1318 342 : switch (relstate)
1319 : {
1320 0 : case SUBREL_STATE_SYNCDONE:
1321 : case SUBREL_STATE_READY:
1322 : case SUBREL_STATE_UNKNOWN:
1323 0 : finish_sync_worker(); /* doesn't return */
1324 : }
1325 :
1326 : /* Calculate the name of the tablesync slot. */
1327 342 : slotname = (char *) palloc(NAMEDATALEN);
1328 342 : ReplicationSlotNameForTablesync(MySubscription->oid,
1329 342 : MyLogicalRepWorker->relid,
1330 : slotname,
1331 : NAMEDATALEN);
1332 :
1333 : /*
1334 : * Here we use the slot name instead of the subscription name as the
1335 : * application_name, so that it is different from the leader apply worker,
1336 : * so that synchronous replication can distinguish them.
1337 : */
1338 342 : LogRepWorkerWalRcvConn =
1339 342 : walrcv_connect(MySubscription->conninfo, true, true,
1340 : must_use_password,
1341 : slotname, &err);
1342 342 : if (LogRepWorkerWalRcvConn == NULL)
1343 0 : ereport(ERROR,
1344 : (errcode(ERRCODE_CONNECTION_FAILURE),
1345 : errmsg("could not connect to the publisher: %s", err)));
1346 :
1347 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1348 : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1349 : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1350 :
1351 : /* Assign the origin tracking record name. */
1352 342 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1353 342 : MyLogicalRepWorker->relid,
1354 : originname,
1355 : sizeof(originname));
1356 :
1357 342 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1358 : {
1359 : /*
1360 : * We have previously errored out before finishing the copy so the
1361 : * replication slot might exist. We want to remove the slot if it
1362 : * already exists and proceed.
1363 : *
1364 : * XXX We could also instead try to drop the slot, last time we failed
1365 : * but for that, we might need to clean up the copy state as it might
1366 : * be in the middle of fetching the rows. Also, if there is a network
1367 : * breakdown then it wouldn't have succeeded so trying it next time
1368 : * seems like a better bet.
1369 : */
1370 18 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1371 : }
1372 324 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1373 : {
1374 : /*
1375 : * The COPY phase was previously done, but tablesync then crashed
1376 : * before it was able to finish normally.
1377 : */
1378 0 : StartTransactionCommand();
1379 :
1380 : /*
1381 : * The origin tracking name must already exist. It was created first
1382 : * time this tablesync was launched.
1383 : */
1384 0 : originid = replorigin_by_name(originname, false);
1385 0 : replorigin_session_setup(originid, 0);
1386 0 : replorigin_session_origin = originid;
1387 0 : *origin_startpos = replorigin_session_get_progress(false);
1388 :
1389 0 : CommitTransactionCommand();
1390 :
1391 0 : goto copy_table_done;
1392 : }
1393 :
1394 342 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1395 342 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1396 342 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1397 342 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1398 :
1399 : /* Update the state and make it visible to others. */
1400 342 : StartTransactionCommand();
1401 342 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1402 342 : MyLogicalRepWorker->relid,
1403 342 : MyLogicalRepWorker->relstate,
1404 342 : MyLogicalRepWorker->relstate_lsn);
1405 342 : CommitTransactionCommand();
1406 342 : pgstat_report_stat(true);
1407 :
1408 342 : StartTransactionCommand();
1409 :
1410 : /*
1411 : * Use a standard write lock here. It might be better to disallow access
1412 : * to the table while it's being synchronized. But we don't want to block
1413 : * the main apply process from working and it has to open the relation in
1414 : * RowExclusiveLock when remapping remote relation id to local one.
1415 : */
1416 342 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1417 :
1418 : /*
1419 : * Start a transaction in the remote node in REPEATABLE READ mode. This
1420 : * ensures that both the replication slot we create (see below) and the
1421 : * COPY are consistent with each other.
1422 : */
1423 342 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1424 : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1425 : 0, NULL);
1426 342 : if (res->status != WALRCV_OK_COMMAND)
1427 0 : ereport(ERROR,
1428 : (errcode(ERRCODE_CONNECTION_FAILURE),
1429 : errmsg("table copy could not start transaction on publisher: %s",
1430 : res->err)));
1431 342 : walrcv_clear_result(res);
1432 :
1433 : /*
1434 : * Create a new permanent logical decoding slot. This slot will be used
1435 : * for the catchup phase after COPY is done, so tell it to use the
1436 : * snapshot to make the final data consistent.
1437 : */
1438 342 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1439 : slotname, false /* permanent */ , false /* two_phase */ ,
1440 : MySubscription->failover,
1441 : CRS_USE_SNAPSHOT, origin_startpos);
1442 :
1443 : /*
1444 : * Setup replication origin tracking. The purpose of doing this before the
1445 : * copy is to avoid doing the copy again due to any error in setting up
1446 : * origin tracking.
1447 : */
1448 342 : originid = replorigin_by_name(originname, true);
1449 342 : if (!OidIsValid(originid))
1450 : {
1451 : /*
1452 : * Origin tracking does not exist, so create it now.
1453 : *
1454 : * Then advance to the LSN got from walrcv_create_slot. This is WAL
1455 : * logged for the purpose of recovery. Locks are to prevent the
1456 : * replication origin from vanishing while advancing.
1457 : */
1458 342 : originid = replorigin_create(originname);
1459 :
1460 342 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1461 342 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1462 : true /* go backward */ , true /* WAL log */ );
1463 342 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1464 :
1465 342 : replorigin_session_setup(originid, 0);
1466 342 : replorigin_session_origin = originid;
1467 : }
1468 : else
1469 : {
1470 0 : ereport(ERROR,
1471 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1472 : errmsg("replication origin \"%s\" already exists",
1473 : originname)));
1474 : }
1475 :
1476 : /*
1477 : * Make sure that the copy command runs as the table owner, unless the
1478 : * user has opted out of that behaviour.
1479 : */
1480 342 : run_as_owner = MySubscription->runasowner;
1481 342 : if (!run_as_owner)
1482 340 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1483 :
1484 : /*
1485 : * Check that our table sync worker has permission to insert into the
1486 : * target table.
1487 : */
1488 342 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1489 : ACL_INSERT);
1490 342 : if (aclresult != ACLCHECK_OK)
1491 0 : aclcheck_error(aclresult,
1492 0 : get_relkind_objtype(rel->rd_rel->relkind),
1493 0 : RelationGetRelationName(rel));
1494 :
1495 : /*
1496 : * COPY FROM does not honor RLS policies. That is not a problem for
1497 : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1498 : * who has it implicitly), but other roles should not be able to
1499 : * circumvent RLS. Disallow logical replication into RLS enabled
1500 : * relations for such roles.
1501 : */
1502 342 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
1503 0 : ereport(ERROR,
1504 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1505 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1506 : GetUserNameFromId(GetUserId(), true),
1507 : RelationGetRelationName(rel))));
1508 :
1509 : /* Now do the initial data copy */
1510 342 : PushActiveSnapshot(GetTransactionSnapshot());
1511 342 : copy_table(rel);
1512 320 : PopActiveSnapshot();
1513 :
1514 320 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1515 320 : if (res->status != WALRCV_OK_COMMAND)
1516 0 : ereport(ERROR,
1517 : (errcode(ERRCODE_CONNECTION_FAILURE),
1518 : errmsg("table copy could not finish transaction on publisher: %s",
1519 : res->err)));
1520 320 : walrcv_clear_result(res);
1521 :
1522 320 : if (!run_as_owner)
1523 318 : RestoreUserContext(&ucxt);
1524 :
1525 320 : table_close(rel, NoLock);
1526 :
1527 : /* Make the copy visible. */
1528 320 : CommandCounterIncrement();
1529 :
1530 : /*
1531 : * Update the persisted state to indicate the COPY phase is done; make it
1532 : * visible to others.
1533 : */
1534 320 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1535 320 : MyLogicalRepWorker->relid,
1536 : SUBREL_STATE_FINISHEDCOPY,
1537 320 : MyLogicalRepWorker->relstate_lsn);
1538 :
1539 320 : CommitTransactionCommand();
1540 :
1541 320 : copy_table_done:
1542 :
1543 320 : elog(DEBUG1,
1544 : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1545 : originname, LSN_FORMAT_ARGS(*origin_startpos));
1546 :
1547 : /*
1548 : * We are done with the initial data synchronization, update the state.
1549 : */
1550 320 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1551 320 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1552 320 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1553 320 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1554 :
1555 : /*
1556 : * Finally, wait until the leader apply worker tells us to catch up and
1557 : * then return to let LogicalRepApplyLoop do it.
1558 : */
1559 320 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1560 320 : return slotname;
1561 : }
1562 :
1563 : /*
1564 : * Common code to fetch the up-to-date sync state info into the static lists.
1565 : *
1566 : * Returns true if subscription has 1 or more tables, else false.
1567 : *
1568 : * Note: If this function started the transaction (indicated by the parameter)
1569 : * then it is the caller's responsibility to commit it.
1570 : */
1571 : static bool
1572 8896 : FetchTableStates(bool *started_tx)
1573 : {
1574 : static bool has_subrels = false;
1575 :
1576 8896 : *started_tx = false;
1577 :
1578 8896 : if (table_states_validity != SYNC_TABLE_STATE_VALID)
1579 : {
1580 : MemoryContext oldctx;
1581 : List *rstates;
1582 : ListCell *lc;
1583 : SubscriptionRelState *rstate;
1584 :
1585 1450 : table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
1586 :
1587 : /* Clean the old lists. */
1588 1450 : list_free_deep(table_states_not_ready);
1589 1450 : table_states_not_ready = NIL;
1590 :
1591 1450 : if (!IsTransactionState())
1592 : {
1593 1414 : StartTransactionCommand();
1594 1414 : *started_tx = true;
1595 : }
1596 :
1597 : /* Fetch all non-ready tables. */
1598 1450 : rstates = GetSubscriptionRelations(MySubscription->oid, true);
1599 :
1600 : /* Allocate the tracking info in a permanent memory context. */
1601 1450 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1602 3686 : foreach(lc, rstates)
1603 : {
1604 2236 : rstate = palloc(sizeof(SubscriptionRelState));
1605 2236 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1606 2236 : table_states_not_ready = lappend(table_states_not_ready, rstate);
1607 : }
1608 1450 : MemoryContextSwitchTo(oldctx);
1609 :
1610 : /*
1611 : * Does the subscription have tables?
1612 : *
1613 : * If there were not-READY relations found then we know it does. But
1614 : * if table_states_not_ready was empty we still need to check again to
1615 : * see if there are 0 tables.
1616 : */
1617 1840 : has_subrels = (table_states_not_ready != NIL) ||
1618 390 : HasSubscriptionRelations(MySubscription->oid);
1619 :
1620 : /*
1621 : * If the subscription relation cache has been invalidated since we
1622 : * entered this routine, we still use and return the relations we just
1623 : * finished constructing, to avoid infinite loops, but we leave the
1624 : * table states marked as stale so that we'll rebuild it again on next
1625 : * access. Otherwise, we mark the table states as valid.
1626 : */
1627 1450 : if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
1628 1450 : table_states_validity = SYNC_TABLE_STATE_VALID;
1629 : }
1630 :
1631 8896 : return has_subrels;
1632 : }
1633 :
1634 : /*
1635 : * Execute the initial sync with error handling. Disable the subscription,
1636 : * if it's required.
1637 : *
1638 : * Allocate the slot name in long-lived context on return. Note that we don't
1639 : * handle FATAL errors which are probably because of system resource error and
1640 : * are not repeatable.
1641 : */
1642 : static void
1643 342 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1644 : {
1645 342 : char *sync_slotname = NULL;
1646 :
1647 : Assert(am_tablesync_worker());
1648 :
1649 342 : PG_TRY();
1650 : {
1651 : /* Call initial sync. */
1652 342 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1653 : }
1654 22 : PG_CATCH();
1655 : {
1656 22 : if (MySubscription->disableonerr)
1657 2 : DisableSubscriptionAndExit();
1658 : else
1659 : {
1660 : /*
1661 : * Report the worker failed during table synchronization. Abort
1662 : * the current transaction so that the stats message is sent in an
1663 : * idle state.
1664 : */
1665 20 : AbortOutOfAnyTransaction();
1666 20 : pgstat_report_subscription_error(MySubscription->oid, false);
1667 :
1668 20 : PG_RE_THROW();
1669 : }
1670 : }
1671 320 : PG_END_TRY();
1672 :
1673 : /* allocate slot name in long-lived context */
1674 320 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1675 320 : pfree(sync_slotname);
1676 320 : }
1677 :
1678 : /*
1679 : * Runs the tablesync worker.
1680 : *
1681 : * It starts syncing tables. After a successful sync, sets streaming options
1682 : * and starts streaming to catchup with apply worker.
1683 : */
1684 : static void
1685 342 : run_tablesync_worker()
1686 : {
1687 : char originname[NAMEDATALEN];
1688 342 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1689 342 : char *slotname = NULL;
1690 : WalRcvStreamOptions options;
1691 :
1692 342 : start_table_sync(&origin_startpos, &slotname);
1693 :
1694 320 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1695 320 : MyLogicalRepWorker->relid,
1696 : originname,
1697 : sizeof(originname));
1698 :
1699 320 : set_apply_error_context_origin(originname);
1700 :
1701 320 : set_stream_options(&options, slotname, &origin_startpos);
1702 :
1703 320 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1704 :
1705 : /* Apply the changes till we catchup with the apply worker. */
1706 320 : start_apply(origin_startpos);
1707 0 : }
1708 :
1709 : /* Logical Replication Tablesync worker entry point */
1710 : void
1711 342 : TablesyncWorkerMain(Datum main_arg)
1712 : {
1713 342 : int worker_slot = DatumGetInt32(main_arg);
1714 :
1715 342 : SetupApplyOrSyncWorker(worker_slot);
1716 :
1717 342 : run_tablesync_worker();
1718 :
1719 0 : finish_sync_worker();
1720 : }
1721 :
1722 : /*
1723 : * If the subscription has no tables then return false.
1724 : *
1725 : * Otherwise, are all tablesyncs READY?
1726 : *
1727 : * Note: This function is not suitable to be called from outside of apply or
1728 : * tablesync workers because MySubscription needs to be already initialized.
1729 : */
1730 : bool
1731 126 : AllTablesyncsReady(void)
1732 : {
1733 126 : bool started_tx = false;
1734 126 : bool has_subrels = false;
1735 :
1736 : /* We need up-to-date sync state info for subscription tables here. */
1737 126 : has_subrels = FetchTableStates(&started_tx);
1738 :
1739 126 : if (started_tx)
1740 : {
1741 22 : CommitTransactionCommand();
1742 22 : pgstat_report_stat(true);
1743 : }
1744 :
1745 : /*
1746 : * Return false when there are no tables in subscription or not all tables
1747 : * are in ready state; true otherwise.
1748 : */
1749 126 : return has_subrels && (table_states_not_ready == NIL);
1750 : }
1751 :
1752 : /*
1753 : * Update the two_phase state of the specified subscription in pg_subscription.
1754 : */
1755 : void
1756 10 : UpdateTwoPhaseState(Oid suboid, char new_state)
1757 : {
1758 : Relation rel;
1759 : HeapTuple tup;
1760 : bool nulls[Natts_pg_subscription];
1761 : bool replaces[Natts_pg_subscription];
1762 : Datum values[Natts_pg_subscription];
1763 :
1764 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1765 : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1766 : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1767 :
1768 10 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1769 10 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1770 10 : if (!HeapTupleIsValid(tup))
1771 0 : elog(ERROR,
1772 : "cache lookup failed for subscription oid %u",
1773 : suboid);
1774 :
1775 : /* Form a new tuple. */
1776 10 : memset(values, 0, sizeof(values));
1777 10 : memset(nulls, false, sizeof(nulls));
1778 10 : memset(replaces, false, sizeof(replaces));
1779 :
1780 : /* And update/set two_phase state */
1781 10 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1782 10 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1783 :
1784 10 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1785 : values, nulls, replaces);
1786 10 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1787 :
1788 10 : heap_freetuple(tup);
1789 10 : table_close(rel, RowExclusiveLock);
1790 10 : }
|